This is an automated email from the ASF dual-hosted git repository.

michaelsmith pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new a77fec639 IMPALA-13661: Support parallelism above JDBC tables for 
joins/aggregates
a77fec639 is described below

commit a77fec6391eaf5db7c52000c7446cc8582c770ac
Author: pranav.lodha <[email protected]>
AuthorDate: Wed Jul 2 18:59:59 2025 +0530

    IMPALA-13661: Support parallelism above JDBC tables for joins/aggregates
    
    Impala's planner generates a single-fragment, single-
    threaded scan node for queries on JDBC tables because table
    statistics are not properly available from the external
    JDBC source. As a result, even large JDBC tables are
    executed serially, causing suboptimal performance for joins,
    aggregations, and scans over millions of rows.
    
    This patch enables Impala to estimate the number of rows in a JDBC
    table by issuing a COUNT(*) query at query preparation time. The
    estimation is returned via TPrepareResult.setNum_rows_estimate()
    and propagated into DataSourceScanNode. The scan node then uses
    this cardinality to drive planner heuristics such as join order,
    fragment parallelization, and scanner thread selection.
    
    The design leverages the existing JDBC accessor layer:
    - JdbcDataSource.prepare() constructs the configuration and invokes
      GenericJdbcDatabaseAccessor.getTotalNumberOfRecords().
    - The accessor wraps the underlying query in:
          SELECT COUNT(*) FROM (<query>) tmptable
      ensuring correctness for both direct table scans and parameterized
      query strings.
    - The result is captured as num_rows_estimate, which is then applied
      during computeStats() in DataSourceScanNode.
    With accurate (or approximate) row counts, the planner can now:
    - Assign multiple scanner threads to JDBC scan nodes instead of
       falling back to a single-thread plan.
    - Introduce exchange nodes where beneficial, parallelizing data
       fetches across multiple JDBC connections.
    - Produce better join orders by comparing JDBC row cardinalities
       against native Impala tables.
    - Avoid severe underestimation that previously defaulted to wrong
       table statistics, leading to degenerate plans.
    
    For a sample join query mentioned in the test file,
    these are the improvements:
    
    Before Optimization:
    - Cardinality fixed at 1 for all JDBC scans
    - Single fragment, single thread per query
    - Max per-host resource reservation: ~9.7 MB, 1 thread
    - No EXCHANGE or MERGING EXCHANGE operators
    - No broadcast distribution; joins executed serially
    - Example query runtime: ~77s
    
    SCAN JDBC A
       \
        HASH JOIN
           \
            SCAN JDBC B
               \
                HASH JOIN
                   \
                    SCAN JDBC C
                       \
                        TOP-N -> ROOT
    
    After Optimization:
    - Cardinality derived from COUNT(*) (e.g. 150K, 1.5M rows)
    - Multiple fragments per scan, 7 threads per query
    - Max per-host resource reservation: ~123 MB, 7 threads
    - Plans include EXCHANGE and MERGING EXCHANGE operators
    - Broadcast joins on small sides, improving parallelism
    - Example query runtime: ~38s (~2x faster)
    
    SCAN JDBC A --> EXCHANGE(SND) --+
                                      \
                                       EXCHANGE(RCV) -> HASH JOIN(BCAST) --+
    SCAN JDBC B --> EXCHANGE(SND) ----/                                   \
                                                                             
HASH JOIN(BCAST) --+
    SCAN JDBC C --> EXCHANGE(SND) ------------------------------------------/   
              \
                                                                                
                 TOP-N
                                                                                
                   \
                                                                                
                    MERGING EXCHANGE -> ROOT
    
    Also added a new backend configuration flag
    --min_jdbc_scan_cardinality (default: 10) to provide a
    lower bound for scan node cardinality estimates
    during planning. This flag is propagated from BE
    to FE via TBackendGflags and surfaced through
    BackendConfig, ensuring the planner never produces
    unrealistically low cardinality values.
    
    TODO: Add a query option for this optimization
    to avoid extra JDBC round trip for smaller
    queries (IMPALA-14417).
    
    Testing: All cases of Planner tests are written in
    jdbc-parallel.test. Some basic metrics
    are also mentioned in the commit message.
    
    Change-Id: If47d29bdda5b17a1b369440f04d4e209d12133d9
    Reviewed-on: http://gerrit.cloudera.org:8080/23112
    Tested-by: Impala Public Jenkins <[email protected]>
    Reviewed-by: Wenzhe Zhou <[email protected]>
---
 be/src/common/global-flags.cc                      |    2 +
 be/src/util/backend-gflag-util.cc                  |    2 +
 common/thrift/BackendGflags.thrift                 |    2 +
 common/thrift/ExternalDataSource.thrift            |    5 +
 .../impala/extdatasource/jdbc/JdbcDataSource.java  |   21 +-
 .../jdbc/conf/JdbcStorageConfigManager.java        |   27 -
 .../jdbc/dao/GenericJdbcDatabaseAccessor.java      |   22 +-
 .../apache/impala/planner/DataSourceScanNode.java  |   15 +-
 .../org/apache/impala/service/BackendConfig.java   |    7 +
 .../apache/impala/customcluster/LdapHS2Test.java   |    2 +-
 .../org/apache/impala/planner/PlannerTest.java     |    5 +
 .../queries/PlannerTest/jdbc-parallel.test         | 2134 ++++++++++++++++++++
 12 files changed, 2210 insertions(+), 34 deletions(-)

diff --git a/be/src/common/global-flags.cc b/be/src/common/global-flags.cc
index ee2d5533a..f2272b24c 100644
--- a/be/src/common/global-flags.cc
+++ b/be/src/common/global-flags.cc
@@ -71,6 +71,8 @@ DEFINE_bool(skip_external_kerberos_auth, false,
 DEFINE_string(anonymous_user_name, "anonymous",
     "Default username used when a client connects to an unsecured impala 
daemon and "
     "does not specify a username.");
+DEFINE_int32(min_jdbc_scan_cardinality, 10,
+    "Lower bound for jdbc scan node cardinality estimates used by the FE 
planner.");
 
 static const string mem_limit_help_msg = "Limit on process memory consumption. 
"
     "Includes the JVM's memory consumption only if --mem_limit_includes_jvm is 
true. "
diff --git a/be/src/util/backend-gflag-util.cc 
b/be/src/util/backend-gflag-util.cc
index 9f3c1b7a5..d90df897c 100644
--- a/be/src/util/backend-gflag-util.cc
+++ b/be/src/util/backend-gflag-util.cc
@@ -42,6 +42,7 @@ DECLARE_int32(max_hdfs_partitions_parallel_load);
 DECLARE_int32(max_nonhdfs_partitions_parallel_load);
 DECLARE_int32(initial_hms_cnxn_timeout_s);
 DECLARE_int32(kudu_operation_timeout_ms);
+DECLARE_int32(min_jdbc_scan_cardinality);
 DECLARE_int64(inc_stats_size_limit_bytes);
 DECLARE_string(principal);
 DECLARE_string(lineage_event_log_dir);
@@ -588,6 +589,7 @@ Status PopulateThriftBackendGflags(TBackendGflags& cfg) {
       FLAGS_tuple_cache_cost_coefficient_read_bytes);
   cfg.__set_tuple_cache_cost_coefficient_read_rows(
       FLAGS_tuple_cache_cost_coefficient_read_rows);
+  cfg.__set_min_jdbc_scan_cardinality(FLAGS_min_jdbc_scan_cardinality);
   return Status::OK();
 }
 
diff --git a/common/thrift/BackendGflags.thrift 
b/common/thrift/BackendGflags.thrift
index bd46940a2..07f7e65f3 100644
--- a/common/thrift/BackendGflags.thrift
+++ b/common/thrift/BackendGflags.thrift
@@ -357,4 +357,6 @@ struct TBackendGflags {
   163: required double tuple_cache_cost_coefficient_read_bytes
 
   164: required double tuple_cache_cost_coefficient_read_rows
+
+  165: required i32 min_jdbc_scan_cardinality
 }
diff --git a/common/thrift/ExternalDataSource.thrift 
b/common/thrift/ExternalDataSource.thrift
index 4e8e6d593..9656aa7dd 100644
--- a/common/thrift/ExternalDataSource.thrift
+++ b/common/thrift/ExternalDataSource.thrift
@@ -93,6 +93,11 @@ struct TPrepareParams {
   // A list of conjunctive (AND) clauses, each of which contains a list of
   // disjunctive (OR) binary predicates. Always set, may be an empty list.
   3: optional list<list<TBinaryPredicate>> predicates
+
+  // Indicate if external JDBC table handler should clean DBCP DataSource 
object from
+  // cache when its reference count equals 0. Note that the reference count is 
tracked
+  // across all queries for a given data source in the coordinator.
+  4: optional bool clean_dbcp_ds_cache
 }
 
 // Returned by prepare().
diff --git 
a/fe/src/main/java/org/apache/impala/extdatasource/jdbc/JdbcDataSource.java 
b/fe/src/main/java/org/apache/impala/extdatasource/jdbc/JdbcDataSource.java
index e5ba0b09a..4852df2ce 100644
--- a/fe/src/main/java/org/apache/impala/extdatasource/jdbc/JdbcDataSource.java
+++ b/fe/src/main/java/org/apache/impala/extdatasource/jdbc/JdbcDataSource.java
@@ -130,8 +130,27 @@ public class JdbcDataSource implements ExternalDataSource {
               Lists.newArrayList("Invalid init_string value")));
     }
     List<Integer> acceptedPredicates = 
acceptedPredicates(params.getPredicates());
+    long numRecords = 0;
+    try {
+      dbAccessor_ = DatabaseAccessorFactory.getAccessor(tableConfig_);
+      numRecords = dbAccessor_.getTotalNumberOfRecords(tableConfig_);
+      LOG.info(String.format("Estimated number of records: %d", numRecords));
+    } catch (JdbcDatabaseAccessException e) {
+      return new TPrepareResult(
+          new TStatus(TErrorCode.RUNTIME_ERROR,
+              Lists.newArrayList("Failed to retrieve total number of records: "
+                  + e.getMessage())));
+    }
+    if (dbAccessor_ != null) {
+      if (params.isSetClean_dbcp_ds_cache()) {
+        cleanDbcpDSCache_ = params.isClean_dbcp_ds_cache();
+      }
+      dbAccessor_.close(null, cleanDbcpDSCache_);
+      dbAccessor_ = null;
+    }
     return new TPrepareResult(STATUS_OK)
-            .setAccepted_conjuncts(acceptedPredicates);
+            .setAccepted_conjuncts(acceptedPredicates)
+            .setNum_rows_estimate(numRecords);
   }
 
   @Override
diff --git 
a/fe/src/main/java/org/apache/impala/extdatasource/jdbc/conf/JdbcStorageConfigManager.java
 
b/fe/src/main/java/org/apache/impala/extdatasource/jdbc/conf/JdbcStorageConfigManager.java
index 1463330bd..f51a550c3 100644
--- 
a/fe/src/main/java/org/apache/impala/extdatasource/jdbc/conf/JdbcStorageConfigManager.java
+++ 
b/fe/src/main/java/org/apache/impala/extdatasource/jdbc/conf/JdbcStorageConfigManager.java
@@ -116,33 +116,6 @@ public class JdbcStorageConfigManager {
     return config.get(key.getPropertyName());
   }
 
-  public static String getOrigQueryToExecute(Configuration config) {
-    String query;
-    String tableName = config.get(JdbcStorageConfig.TABLE.getPropertyName());
-    if (tableName != null) {
-      // We generate query as 'select * from tbl'
-      query = "select * from " + tableName;
-    } else {
-      query = config.get(JdbcStorageConfig.QUERY.getPropertyName());
-    }
-
-    return query;
-  }
-
-  public static String getQueryToExecute(Configuration config) {
-    String query = config.get(JdbcStorageConfig.QUERY.getPropertyName());
-    if (query != null) {
-      // Query has been defined, return it
-      return query;
-    }
-
-    // We generate query as 'select * from tbl'
-    String tableName = config.get(JdbcStorageConfig.TABLE.getPropertyName());
-    query = "select * from " + tableName;
-
-    return query;
-  }
-
   private static boolean isEmptyString(String value) {
     return ((value == null) || (value.trim().isEmpty()));
   }
diff --git 
a/fe/src/main/java/org/apache/impala/extdatasource/jdbc/dao/GenericJdbcDatabaseAccessor.java
 
b/fe/src/main/java/org/apache/impala/extdatasource/jdbc/dao/GenericJdbcDatabaseAccessor.java
index ddcf972a8..fb82d217d 100644
--- 
a/fe/src/main/java/org/apache/impala/extdatasource/jdbc/dao/GenericJdbcDatabaseAccessor.java
+++ 
b/fe/src/main/java/org/apache/impala/extdatasource/jdbc/dao/GenericJdbcDatabaseAccessor.java
@@ -79,7 +79,7 @@ public class GenericJdbcDatabaseAccessor implements 
DatabaseAccessor {
 
     try {
       initializeDatabaseSource(conf);
-      String sql = JdbcStorageConfigManager.getQueryToExecute(conf);
+      String sql = getQueryToExecute(conf);
       // TODO: If a target database cannot flatten this view query, try to text
       // replace the generated "select *".
       String countQuery = "SELECT COUNT(*) FROM (" + sql + ") tmptable";
@@ -116,7 +116,7 @@ public class GenericJdbcDatabaseAccessor implements 
DatabaseAccessor {
 
     try {
       initializeDatabaseSource(conf);
-      String sql = JdbcStorageConfigManager.getQueryToExecute(conf);
+      String sql = getQueryToExecute(conf);
       String partitionQuery = addLimitAndOffsetToQuery(sql, limit, offset);
 
       LOG.info("Query to execute is [{}]", partitionQuery);
@@ -350,4 +350,22 @@ public class GenericJdbcDatabaseAccessor implements 
DatabaseAccessor {
         .getInt(JdbcStorageConfig.JDBC_FETCH_SIZE.getPropertyName(), 
DEFAULT_FETCH_SIZE);
   }
 
+  protected String getQueryToExecute(Configuration config) {
+    String query = config.get(JdbcStorageConfig.QUERY.getPropertyName());
+    if (query != null) {
+      // Query has been defined, return it
+      return query;
+    }
+
+    // We generate query as 'select * from tbl'
+    String tableName = config.get(JdbcStorageConfig.TABLE.getPropertyName());
+    // Make jdbc table name to be quoted with double quotes if columnMapping 
is not empty
+    String columnMapping = 
config.get(JdbcStorageConfig.COLUMN_MAPPING.getPropertyName());
+    if (!Strings.isNullOrEmpty(columnMapping)) {
+      tableName = getCaseSensitiveName(tableName);
+    }
+    query = "select * from " + tableName;
+
+    return query;
+  }
 }
diff --git a/fe/src/main/java/org/apache/impala/planner/DataSourceScanNode.java 
b/fe/src/main/java/org/apache/impala/planner/DataSourceScanNode.java
index bf5d9d144..79c5bd2ca 100644
--- a/fe/src/main/java/org/apache/impala/planner/DataSourceScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/DataSourceScanNode.java
@@ -49,6 +49,7 @@ import org.apache.impala.extdatasource.thrift.TColumnDesc;
 import org.apache.impala.extdatasource.thrift.TComparisonOp;
 import org.apache.impala.extdatasource.thrift.TPrepareParams;
 import org.apache.impala.extdatasource.thrift.TPrepareResult;
+import org.apache.impala.service.BackendConfig;
 import org.apache.impala.service.FeSupport;
 import org.apache.impala.thrift.TCacheJarResult;
 import org.apache.impala.thrift.TColumnValue;
@@ -102,7 +103,7 @@ public class DataSourceScanNode extends ScanNode {
   @Override
   public void init(Analyzer analyzer) throws ImpalaException {
     checkForSupportedFileFormats();
-    prepareDataSource();
+    prepareDataSource(analyzer.getQueryOptions());
     conjuncts_ = orderConjunctsByCost(conjuncts_);
     computeStats(analyzer);
     // materialize slots in remaining conjuncts_
@@ -179,7 +180,7 @@ public class DataSourceScanNode extends ScanNode {
    * stats. The accepted predicates are moved from conjuncts_ into 
acceptedConjuncts_
    * and the associated TBinaryPredicates are set in acceptedPredicates_.
    */
-  private void prepareDataSource() throws InternalException {
+  private void prepareDataSource(TQueryOptions queryOptions) throws 
InternalException {
     // Binary predicates that will be offered to the data source.
     List<List<TBinaryPredicate>> offeredPredicates = new ArrayList<>();
     // The index into conjuncts_ for each element in offeredPredicates.
@@ -223,6 +224,7 @@ public class DataSourceScanNode extends ScanNode {
       TPrepareParams prepareParams = new TPrepareParams();
       prepareParams.setInit_string(table_.getInitString());
       prepareParams.setPredicates(offeredPredicates);
+      
prepareParams.setClean_dbcp_ds_cache(queryOptions.isClean_dbcp_ds_cache());
       // TODO: Include DB (i.e. getFullName())?
       prepareParams.setTable_name(table_.getName());
       prepareResult = executor.prepare(prepareParams);
@@ -332,8 +334,15 @@ public class DataSourceScanNode extends ScanNode {
     super.computeStats(analyzer);
     inputCardinality_ = numRowsEstimate_;
     cardinality_ = numRowsEstimate_;
+    // Use estimate from the data source if present
+    if (numRowsEstimate_ > 0) {
+      cardinality_ = numRowsEstimate_;
+    } else {
+      cardinality_ = table_.getNumRows(); // fallback
+    }
     cardinality_ = applyConjunctsSelectivity(cardinality_);
-    cardinality_ = Math.max(1, cardinality_);
+    int minCard = BackendConfig.INSTANCE.getMinJdbcScanCardinality();
+    cardinality_ = Math.max(minCard, cardinality_);
     cardinality_ = capCardinalityAtLimit(cardinality_);
 
     if (LOG.isTraceEnabled()) {
diff --git a/fe/src/main/java/org/apache/impala/service/BackendConfig.java 
b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
index 65d4fc7e2..871d9d1b5 100644
--- a/fe/src/main/java/org/apache/impala/service/BackendConfig.java
+++ b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
@@ -192,6 +192,13 @@ public class BackendConfig {
     return backendCfg_.blacklisted_dbs;
   }
 
+  public int getMinJdbcScanCardinality() {
+    if (backendCfg_.isSetMin_jdbc_scan_cardinality()) {
+      return backendCfg_.getMin_jdbc_scan_cardinality();
+    }
+    return 10;
+  }
+
   public String getBlacklistedTables() {
     return backendCfg_.blacklisted_tables;
   }
diff --git a/fe/src/test/java/org/apache/impala/customcluster/LdapHS2Test.java 
b/fe/src/test/java/org/apache/impala/customcluster/LdapHS2Test.java
index b7619f24b..eb76f9c75 100644
--- a/fe/src/test/java/org/apache/impala/customcluster/LdapHS2Test.java
+++ b/fe/src/test/java/org/apache/impala/customcluster/LdapHS2Test.java
@@ -956,6 +956,6 @@ public class LdapHS2Test {
         "Table has been dropped.");
 
     // Two successful authentications for each ExecAndFetch().
-    verifyMetrics(25, 0);
+    verifyMetrics(23, 0);
   }
 }
diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java 
b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
index b316f366c..9e3b21727 100644
--- a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
+++ b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
@@ -245,6 +245,11 @@ public class PlannerTest extends PlannerTestBase {
     runPlannerTestFile("hbase");
   }
 
+  @Test
+    public void testJdbcParallel() {
+      runPlannerTestFile("jdbc-parallel");
+    }
+
   /**
    * Test of HBase in the case of disabling the key scan.
    * Normally the HBase scan node goes out to HBase to query the
diff --git 
a/testdata/workloads/functional-planner/queries/PlannerTest/jdbc-parallel.test 
b/testdata/workloads/functional-planner/queries/PlannerTest/jdbc-parallel.test
new file mode 100644
index 000000000..0a4b55ceb
--- /dev/null
+++ 
b/testdata/workloads/functional-planner/queries/PlannerTest/jdbc-parallel.test
@@ -0,0 +1,2134 @@
+select id, int_col, bool_col, year, month from functional.alltypes
+---- PLAN
+PLAN-ROOT SINK
+|
+00:SCAN HDFS [functional.alltypes]
+   HDFS partitions=24/24 files=24 size=478.45KB
+   row-size=17B cardinality=7.30K
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+01:EXCHANGE [UNPARTITIONED]
+|
+00:SCAN HDFS [functional.alltypes]
+   HDFS partitions=24/24 files=24 size=478.45KB
+   row-size=17B cardinality=7.30K
+====
+select c_custkey from tpch_jdbc.customer
+---- PLAN
+PLAN-ROOT SINK
+|
+00:SCAN DATA SOURCE [tpch_jdbc.customer]
+   row-size=8B cardinality=150.00K
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+01:EXCHANGE [UNPARTITIONED]
+|
+00:SCAN DATA SOURCE [tpch_jdbc.customer]
+   row-size=8B cardinality=150.00K
+====
+
+select c_custkey, c_name from tpch_jdbc.customer where c_custkey < 1000
+---- PLAN
+PLAN-ROOT SINK
+|
+00:SCAN DATA SOURCE [tpch_jdbc.customer]
+   data source predicates: c_custkey < 1000
+   row-size=20B cardinality=150.00K
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+01:EXCHANGE [UNPARTITIONED]
+|
+00:SCAN DATA SOURCE [tpch_jdbc.customer]
+   data source predicates: c_custkey < 1000
+   row-size=20B cardinality=150.00K
+====
+select o_orderkey, o_custkey from tpch_jdbc.orders where o_totalprice > 100000
+---- PLAN
+PLAN-ROOT SINK
+|
+00:SCAN DATA SOURCE [tpch_jdbc.orders]
+   data source predicates: o_totalprice > 100000
+   row-size=16B cardinality=1.50M
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+01:EXCHANGE [UNPARTITIONED]
+|
+00:SCAN DATA SOURCE [tpch_jdbc.orders]
+   data source predicates: o_totalprice > 100000
+   row-size=16B cardinality=1.50M
+====
+select a.c_custkey, b.o_orderkey from tpch_jdbc.customer
+a join tpch_jdbc.orders b on a.c_custkey=b.o_custkey
+---- PLAN
+PLAN-ROOT SINK
+|
+02:HASH JOIN [INNER JOIN]
+|  hash predicates: a.c_custkey = b.o_custkey
+|  row-size=24B cardinality=150.00K
+|
+|--01:SCAN DATA SOURCE [tpch_jdbc.orders b]
+|     row-size=0B cardinality=1.50M
+|
+00:SCAN DATA SOURCE [tpch_jdbc.customer a]
+   row-size=0B cardinality=150.00K
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+04:EXCHANGE [UNPARTITIONED]
+|
+02:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: a.c_custkey = b.o_custkey
+|  row-size=24B cardinality=150.00K
+|
+|--03:EXCHANGE [BROADCAST]
+|  |
+|  01:SCAN DATA SOURCE [tpch_jdbc.orders b]
+|     row-size=0B cardinality=1.50M
+|
+00:SCAN DATA SOURCE [tpch_jdbc.customer a]
+   row-size=0B cardinality=150.00K
+====
+select a.c_custkey, b.o_orderkey, c.l_linenumber from
+tpch_jdbc.customer a join tpch_jdbc.orders b on
+a.c_custkey=b.o_custkey join tpch_jdbc.lineitem c on
+b.o_orderkey=c.l_orderkey
+---- PLAN
+PLAN-ROOT SINK
+|
+04:HASH JOIN [INNER JOIN]
+|  hash predicates: b.o_orderkey = c.l_orderkey
+|  row-size=36B cardinality=150.00K
+|
+|--02:SCAN DATA SOURCE [tpch_jdbc.lineitem c]
+|     row-size=0B cardinality=6.00M
+|
+03:HASH JOIN [INNER JOIN]
+|  hash predicates: a.c_custkey = b.o_custkey
+|  row-size=24B cardinality=150.00K
+|
+|--01:SCAN DATA SOURCE [tpch_jdbc.orders b]
+|     row-size=0B cardinality=1.50M
+|
+00:SCAN DATA SOURCE [tpch_jdbc.customer a]
+   row-size=0B cardinality=150.00K
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+07:EXCHANGE [UNPARTITIONED]
+|
+04:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: b.o_orderkey = c.l_orderkey
+|  row-size=36B cardinality=150.00K
+|
+|--06:EXCHANGE [BROADCAST]
+|  |
+|  02:SCAN DATA SOURCE [tpch_jdbc.lineitem c]
+|     row-size=0B cardinality=6.00M
+|
+03:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: a.c_custkey = b.o_custkey
+|  row-size=24B cardinality=150.00K
+|
+|--05:EXCHANGE [BROADCAST]
+|  |
+|  01:SCAN DATA SOURCE [tpch_jdbc.orders b]
+|     row-size=0B cardinality=1.50M
+|
+00:SCAN DATA SOURCE [tpch_jdbc.customer a]
+   row-size=0B cardinality=150.00K
+====
+select a.c_name, count(b.o_orderkey) from tpch_jdbc.customer
+a left join tpch_jdbc.orders b on a.c_custkey=b.o_custkey
+group by a.c_name
+---- PLAN
+PLAN-ROOT SINK
+|
+03:AGGREGATE [FINALIZE]
+|  output: count(b.o_orderkey)
+|  group by: a.c_name
+|  row-size=20B cardinality=150.00K
+|
+02:HASH JOIN [LEFT OUTER JOIN]
+|  hash predicates: a.c_custkey = b.o_custkey
+|  row-size=36B cardinality=150.00K
+|
+|--01:SCAN DATA SOURCE [tpch_jdbc.orders b]
+|     row-size=0B cardinality=1.50M
+|
+00:SCAN DATA SOURCE [tpch_jdbc.customer a]
+   row-size=0B cardinality=150.00K
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+07:EXCHANGE [UNPARTITIONED]
+|
+06:AGGREGATE [FINALIZE]
+|  output: count:merge(b.o_orderkey)
+|  group by: a.c_name
+|  row-size=20B cardinality=150.00K
+|
+05:EXCHANGE [HASH(a.c_name)]
+|
+03:AGGREGATE [STREAMING]
+|  output: count(b.o_orderkey)
+|  group by: a.c_name
+|  row-size=20B cardinality=150.00K
+|
+02:HASH JOIN [LEFT OUTER JOIN, BROADCAST]
+|  hash predicates: a.c_custkey = b.o_custkey
+|  row-size=36B cardinality=150.00K
+|
+|--04:EXCHANGE [BROADCAST]
+|  |
+|  01:SCAN DATA SOURCE [tpch_jdbc.orders b]
+|     row-size=0B cardinality=1.50M
+|
+00:SCAN DATA SOURCE [tpch_jdbc.customer a]
+   row-size=0B cardinality=150.00K
+====
+select count(*) from tpch_jdbc.lineitem
+---- PLAN
+PLAN-ROOT SINK
+|
+01:AGGREGATE [FINALIZE]
+|  output: count(*)
+|  row-size=8B cardinality=1
+|
+00:SCAN DATA SOURCE [tpch_jdbc.lineitem]
+   row-size=0B cardinality=6.00M
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+03:AGGREGATE [FINALIZE]
+|  output: count:merge(*)
+|  row-size=8B cardinality=1
+|
+02:EXCHANGE [UNPARTITIONED]
+|
+01:AGGREGATE
+|  output: count(*)
+|  row-size=8B cardinality=1
+|
+00:SCAN DATA SOURCE [tpch_jdbc.lineitem]
+   row-size=0B cardinality=6.00M
+====
+select avg(l_extendedprice) from tpch_jdbc.lineitem
+---- PLAN
+PLAN-ROOT SINK
+|
+01:AGGREGATE [FINALIZE]
+|  output: avg(l_extendedprice)
+|  row-size=8B cardinality=1
+|
+00:SCAN DATA SOURCE [tpch_jdbc.lineitem]
+   row-size=0B cardinality=6.00M
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+03:AGGREGATE [FINALIZE]
+|  output: avg:merge(l_extendedprice)
+|  row-size=8B cardinality=1
+|
+02:EXCHANGE [UNPARTITIONED]
+|
+01:AGGREGATE
+|  output: avg(l_extendedprice)
+|  row-size=8B cardinality=1
+|
+00:SCAN DATA SOURCE [tpch_jdbc.lineitem]
+   row-size=0B cardinality=6.00M
+====
+select c_custkey, c_name from tpch_jdbc.customer order by c_custkey limit 50
+---- PLAN
+PLAN-ROOT SINK
+|
+01:TOP-N [LIMIT=50]
+|  order by: c_custkey ASC
+|  row-size=20B cardinality=50
+|
+00:SCAN DATA SOURCE [tpch_jdbc.customer]
+   row-size=0B cardinality=150.00K
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+02:MERGING-EXCHANGE [UNPARTITIONED]
+|  order by: c_custkey ASC
+|  limit: 50
+|
+01:TOP-N [LIMIT=50]
+|  order by: c_custkey ASC
+|  row-size=20B cardinality=50
+|
+00:SCAN DATA SOURCE [tpch_jdbc.customer]
+   row-size=0B cardinality=150.00K
+====
+select l_orderkey, sum(l_extendedprice) from tpch_jdbc.lineitem group by 
l_orderkey
+---- PLAN
+PLAN-ROOT SINK
+|
+01:AGGREGATE [FINALIZE]
+|  output: sum(l_extendedprice)
+|  group by: l_orderkey
+|  row-size=24B cardinality=6.00M
+|
+00:SCAN DATA SOURCE [tpch_jdbc.lineitem]
+   row-size=0B cardinality=6.00M
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+04:EXCHANGE [UNPARTITIONED]
+|
+03:AGGREGATE [FINALIZE]
+|  output: sum:merge(l_extendedprice)
+|  group by: l_orderkey
+|  row-size=24B cardinality=6.00M
+|
+02:EXCHANGE [HASH(l_orderkey)]
+|
+01:AGGREGATE [STREAMING]
+|  output: sum(l_extendedprice)
+|  group by: l_orderkey
+|  row-size=24B cardinality=6.00M
+|
+00:SCAN DATA SOURCE [tpch_jdbc.lineitem]
+   row-size=0B cardinality=6.00M
+====
+select n_name, count(*) from tpch_jdbc.nation group by n_name
+---- PLAN
+PLAN-ROOT SINK
+|
+01:AGGREGATE [FINALIZE]
+|  output: count(*)
+|  group by: n_name
+|  row-size=20B cardinality=25
+|
+00:SCAN DATA SOURCE [tpch_jdbc.nation]
+   row-size=0B cardinality=25
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+04:EXCHANGE [UNPARTITIONED]
+|
+03:AGGREGATE [FINALIZE]
+|  output: count:merge(*)
+|  group by: n_name
+|  row-size=20B cardinality=25
+|
+02:EXCHANGE [HASH(n_name)]
+|
+01:AGGREGATE [STREAMING]
+|  output: count(*)
+|  group by: n_name
+|  row-size=20B cardinality=25
+|
+00:SCAN DATA SOURCE [tpch_jdbc.nation]
+   row-size=0B cardinality=25
+====
+select r_name, n_name from tpch_jdbc.region r join
+tpch_jdbc.nation n on r.r_regionkey=n.n_regionkey
+---- PLAN
+PLAN-ROOT SINK
+|
+02:HASH JOIN [INNER JOIN]
+|  hash predicates: r.r_regionkey = n.n_regionkey
+|  row-size=28B cardinality=10
+|
+|--01:SCAN DATA SOURCE [tpch_jdbc.nation n]
+|     row-size=0B cardinality=25
+|
+00:SCAN DATA SOURCE [tpch_jdbc.region r]
+   row-size=0B cardinality=10
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+04:EXCHANGE [UNPARTITIONED]
+|
+02:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: r.r_regionkey = n.n_regionkey
+|  row-size=28B cardinality=10
+|
+|--03:EXCHANGE [BROADCAST]
+|  |
+|  01:SCAN DATA SOURCE [tpch_jdbc.nation n]
+|     row-size=0B cardinality=25
+|
+00:SCAN DATA SOURCE [tpch_jdbc.region r]
+   row-size=0B cardinality=10
+====
+select o_orderstatus, count(*) from tpch_jdbc.orders
+group by o_orderstatus
+---- PLAN
+PLAN-ROOT SINK
+|
+01:AGGREGATE [FINALIZE]
+|  output: count(*)
+|  group by: o_orderstatus
+|  row-size=20B cardinality=1.50M
+|
+00:SCAN DATA SOURCE [tpch_jdbc.orders]
+   row-size=0B cardinality=1.50M
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+04:EXCHANGE [UNPARTITIONED]
+|
+03:AGGREGATE [FINALIZE]
+|  output: count:merge(*)
+|  group by: o_orderstatus
+|  row-size=20B cardinality=1.50M
+|
+02:EXCHANGE [HASH(o_orderstatus)]
+|
+01:AGGREGATE [STREAMING]
+|  output: count(*)
+|  group by: o_orderstatus
+|  row-size=20B cardinality=1.50M
+|
+00:SCAN DATA SOURCE [tpch_jdbc.orders]
+   row-size=0B cardinality=1.50M
+====
+select c_custkey, c_name from tpch_jdbc.customer where
+c_custkey in (select o_custkey from tpch_jdbc.orders
+where o_totalprice > 50000)
+---- PLAN
+PLAN-ROOT SINK
+|
+02:HASH JOIN [RIGHT SEMI JOIN]
+|  hash predicates: o_custkey = c_custkey
+|  row-size=20B cardinality=150.00K
+|
+|--00:SCAN DATA SOURCE [tpch_jdbc.customer]
+|     row-size=0B cardinality=150.00K
+|
+01:SCAN DATA SOURCE [tpch_jdbc.orders]
+   data source predicates: o_totalprice > 50000
+   row-size=8B cardinality=1.50M
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+05:EXCHANGE [UNPARTITIONED]
+|
+02:HASH JOIN [RIGHT SEMI JOIN, PARTITIONED]
+|  hash predicates: o_custkey = c_custkey
+|  row-size=20B cardinality=150.00K
+|
+|--04:EXCHANGE [HASH(c_custkey)]
+|  |
+|  00:SCAN DATA SOURCE [tpch_jdbc.customer]
+|     row-size=0B cardinality=150.00K
+|
+03:EXCHANGE [HASH(o_custkey)]
+|
+01:SCAN DATA SOURCE [tpch_jdbc.orders]
+   data source predicates: o_totalprice > 50000
+   row-size=8B cardinality=1.50M
+====
+select c.c_custkey, count(l.l_orderkey) from tpch_jdbc.customer
+c join tpch_jdbc.orders o on c.c_custkey=o.o_custkey join
+tpch_jdbc.lineitem l on o.o_orderkey=l.l_orderkey group by c.c_custkey
+---- PLAN
+PLAN-ROOT SINK
+|
+05:AGGREGATE [FINALIZE]
+|  output: count(l.l_orderkey)
+|  group by: c.c_custkey
+|  row-size=16B cardinality=150.00K
+|
+04:HASH JOIN [INNER JOIN]
+|  hash predicates: o.o_orderkey = l.l_orderkey
+|  row-size=32B cardinality=150.00K
+|
+|--02:SCAN DATA SOURCE [tpch_jdbc.lineitem l]
+|     row-size=0B cardinality=6.00M
+|
+03:HASH JOIN [INNER JOIN]
+|  hash predicates: c.c_custkey = o.o_custkey
+|  row-size=24B cardinality=150.00K
+|
+|--01:SCAN DATA SOURCE [tpch_jdbc.orders o]
+|     row-size=0B cardinality=1.50M
+|
+00:SCAN DATA SOURCE [tpch_jdbc.customer c]
+   row-size=0B cardinality=150.00K
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+10:EXCHANGE [UNPARTITIONED]
+|
+09:AGGREGATE [FINALIZE]
+|  output: count:merge(l.l_orderkey)
+|  group by: c.c_custkey
+|  row-size=16B cardinality=150.00K
+|
+08:EXCHANGE [HASH(c.c_custkey)]
+|
+05:AGGREGATE [STREAMING]
+|  output: count(l.l_orderkey)
+|  group by: c.c_custkey
+|  row-size=16B cardinality=150.00K
+|
+04:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: o.o_orderkey = l.l_orderkey
+|  row-size=32B cardinality=150.00K
+|
+|--07:EXCHANGE [BROADCAST]
+|  |
+|  02:SCAN DATA SOURCE [tpch_jdbc.lineitem l]
+|     row-size=0B cardinality=6.00M
+|
+03:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: c.c_custkey = o.o_custkey
+|  row-size=24B cardinality=150.00K
+|
+|--06:EXCHANGE [BROADCAST]
+|  |
+|  01:SCAN DATA SOURCE [tpch_jdbc.orders o]
+|     row-size=0B cardinality=1.50M
+|
+00:SCAN DATA SOURCE [tpch_jdbc.customer c]
+   row-size=0B cardinality=150.00K
+====
+select max(l_extendedprice), min(l_extendedprice) from tpch_jdbc.lineitem
+---- PLAN
+PLAN-ROOT SINK
+|
+01:AGGREGATE [FINALIZE]
+|  output: max(l_extendedprice), min(l_extendedprice)
+|  row-size=16B cardinality=1
+|
+00:SCAN DATA SOURCE [tpch_jdbc.lineitem]
+   row-size=0B cardinality=6.00M
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+03:AGGREGATE [FINALIZE]
+|  output: max:merge(l_extendedprice), min:merge(l_extendedprice)
+|  row-size=16B cardinality=1
+|
+02:EXCHANGE [UNPARTITIONED]
+|
+01:AGGREGATE
+|  output: max(l_extendedprice), min(l_extendedprice)
+|  row-size=16B cardinality=1
+|
+00:SCAN DATA SOURCE [tpch_jdbc.lineitem]
+   row-size=0B cardinality=6.00M
+====
+select c.c_custkey, o.o_orderdate from tpch_jdbc.customer c
+join tpch_jdbc.orders o on c.c_custkey=o.o_custkey where
+o.o_orderdate > '1995-01-01'
+---- PLAN
+PLAN-ROOT SINK
+|
+02:HASH JOIN [INNER JOIN]
+|  hash predicates: c.c_custkey = o.o_custkey
+|  row-size=28B cardinality=150.00K
+|
+|--01:SCAN DATA SOURCE [tpch_jdbc.orders o]
+|     data source predicates: o.o_orderdate > '1995-01-01'
+|     row-size=0B cardinality=1.50M
+|
+00:SCAN DATA SOURCE [tpch_jdbc.customer c]
+   row-size=0B cardinality=150.00K
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+04:EXCHANGE [UNPARTITIONED]
+|
+02:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: c.c_custkey = o.o_custkey
+|  row-size=28B cardinality=150.00K
+|
+|--03:EXCHANGE [BROADCAST]
+|  |
+|  01:SCAN DATA SOURCE [tpch_jdbc.orders o]
+|     data source predicates: o.o_orderdate > '1995-01-01'
+|     row-size=0B cardinality=1.50M
+|
+00:SCAN DATA SOURCE [tpch_jdbc.customer c]
+   row-size=0B cardinality=150.00K
+====
+select * from tpch_jdbc.customer where c_name like 'Customer#000%'
+---- PLAN
+PLAN-ROOT SINK
+|
+00:SCAN DATA SOURCE [tpch_jdbc.customer]
+   predicates: c_name LIKE 'Customer#000%'
+   row-size=78B cardinality=15.00K
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+01:EXCHANGE [UNPARTITIONED]
+|
+00:SCAN DATA SOURCE [tpch_jdbc.customer]
+   predicates: c_name LIKE 'Customer#000%'
+   row-size=78B cardinality=15.00K
+====
+select l_orderkey, avg(l_quantity), avg(l_discount)
+from tpch_jdbc.lineitem group by l_orderkey
+---- PLAN
+PLAN-ROOT SINK
+|
+01:AGGREGATE [FINALIZE]
+|  output: avg(l_quantity), avg(l_discount)
+|  group by: l_orderkey
+|  row-size=24B cardinality=6.00M
+|
+00:SCAN DATA SOURCE [tpch_jdbc.lineitem]
+   row-size=0B cardinality=6.00M
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+04:EXCHANGE [UNPARTITIONED]
+|
+03:AGGREGATE [FINALIZE]
+|  output: avg:merge(l_quantity), avg:merge(l_discount)
+|  group by: l_orderkey
+|  row-size=24B cardinality=6.00M
+|
+02:EXCHANGE [HASH(l_orderkey)]
+|
+01:AGGREGATE [STREAMING]
+|  output: avg(l_quantity), avg(l_discount)
+|  group by: l_orderkey
+|  row-size=24B cardinality=6.00M
+|
+00:SCAN DATA SOURCE [tpch_jdbc.lineitem]
+   row-size=0B cardinality=6.00M
+====
+select c.c_name, sum(l.l_extendedprice) from
+tpch_jdbc.customer c join tpch_jdbc.orders o on
+c.c_custkey=o.o_custkey join tpch_jdbc.lineitem l on
+o.o_orderkey=l.l_orderkey group by c.c_name
+---- PLAN
+PLAN-ROOT SINK
+|
+05:AGGREGATE [FINALIZE]
+|  output: sum(l.l_extendedprice)
+|  group by: c.c_name
+|  row-size=28B cardinality=150.00K
+|
+04:HASH JOIN [INNER JOIN]
+|  hash predicates: o.o_orderkey = l.l_orderkey
+|  row-size=52B cardinality=150.00K
+|
+|--02:SCAN DATA SOURCE [tpch_jdbc.lineitem l]
+|     row-size=0B cardinality=6.00M
+|
+03:HASH JOIN [INNER JOIN]
+|  hash predicates: c.c_custkey = o.o_custkey
+|  row-size=36B cardinality=150.00K
+|
+|--01:SCAN DATA SOURCE [tpch_jdbc.orders o]
+|     row-size=0B cardinality=1.50M
+|
+00:SCAN DATA SOURCE [tpch_jdbc.customer c]
+   row-size=0B cardinality=150.00K
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+10:EXCHANGE [UNPARTITIONED]
+|
+09:AGGREGATE [FINALIZE]
+|  output: sum:merge(l.l_extendedprice)
+|  group by: c.c_name
+|  row-size=28B cardinality=150.00K
+|
+08:EXCHANGE [HASH(c.c_name)]
+|
+05:AGGREGATE [STREAMING]
+|  output: sum(l.l_extendedprice)
+|  group by: c.c_name
+|  row-size=28B cardinality=150.00K
+|
+04:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: o.o_orderkey = l.l_orderkey
+|  row-size=52B cardinality=150.00K
+|
+|--07:EXCHANGE [BROADCAST]
+|  |
+|  02:SCAN DATA SOURCE [tpch_jdbc.lineitem l]
+|     row-size=0B cardinality=6.00M
+|
+03:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: c.c_custkey = o.o_custkey
+|  row-size=36B cardinality=150.00K
+|
+|--06:EXCHANGE [BROADCAST]
+|  |
+|  01:SCAN DATA SOURCE [tpch_jdbc.orders o]
+|     row-size=0B cardinality=1.50M
+|
+00:SCAN DATA SOURCE [tpch_jdbc.customer c]
+   row-size=0B cardinality=150.00K
+====
+select l.l_orderkey, o.o_orderpriority, count(*)
+from tpch_jdbc.orders o join tpch_jdbc.lineitem l on
+o.o_orderkey=l.l_orderkey group by l.l_orderkey, o.o_orderpriority
+---- PLAN
+PLAN-ROOT SINK
+|
+03:AGGREGATE [FINALIZE]
+|  output: count(*)
+|  group by: l.l_orderkey, o.o_orderpriority
+|  row-size=28B cardinality=1.50M
+|
+02:HASH JOIN [INNER JOIN]
+|  hash predicates: o.o_orderkey = l.l_orderkey
+|  row-size=28B cardinality=1.50M
+|
+|--01:SCAN DATA SOURCE [tpch_jdbc.lineitem l]
+|     row-size=0B cardinality=6.00M
+|
+00:SCAN DATA SOURCE [tpch_jdbc.orders o]
+   row-size=0B cardinality=1.50M
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+07:EXCHANGE [UNPARTITIONED]
+|
+06:AGGREGATE [FINALIZE]
+|  output: count:merge(*)
+|  group by: l.l_orderkey, o.o_orderpriority
+|  row-size=28B cardinality=1.50M
+|
+05:EXCHANGE [HASH(l.l_orderkey,o.o_orderpriority)]
+|
+03:AGGREGATE [STREAMING]
+|  output: count(*)
+|  group by: l.l_orderkey, o.o_orderpriority
+|  row-size=28B cardinality=1.50M
+|
+02:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: o.o_orderkey = l.l_orderkey
+|  row-size=28B cardinality=1.50M
+|
+|--04:EXCHANGE [BROADCAST]
+|  |
+|  01:SCAN DATA SOURCE [tpch_jdbc.lineitem l]
+|     row-size=0B cardinality=6.00M
+|
+00:SCAN DATA SOURCE [tpch_jdbc.orders o]
+   row-size=0B cardinality=1.50M
+====
+select c.c_custkey from tpch_jdbc.customer c where
+exists (select 1 from tpch_jdbc.orders o where
+o.o_custkey=c.c_custkey)
+---- PLAN
+PLAN-ROOT SINK
+|
+02:HASH JOIN [RIGHT SEMI JOIN]
+|  hash predicates: o.o_custkey = c.c_custkey
+|  row-size=8B cardinality=150.00K
+|
+|--00:SCAN DATA SOURCE [tpch_jdbc.customer c]
+|     row-size=0B cardinality=150.00K
+|
+01:SCAN DATA SOURCE [tpch_jdbc.orders o]
+   row-size=8B cardinality=1.50M
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+05:EXCHANGE [UNPARTITIONED]
+|
+02:HASH JOIN [RIGHT SEMI JOIN, PARTITIONED]
+|  hash predicates: o.o_custkey = c.c_custkey
+|  row-size=8B cardinality=150.00K
+|
+|--04:EXCHANGE [HASH(c.c_custkey)]
+|  |
+|  00:SCAN DATA SOURCE [tpch_jdbc.customer c]
+|     row-size=0B cardinality=150.00K
+|
+03:EXCHANGE [HASH(o.o_custkey)]
+|
+01:SCAN DATA SOURCE [tpch_jdbc.orders o]
+   row-size=8B cardinality=1.50M
+====
+select o_orderpriority, sum(o_totalprice) from
+tpch_jdbc.orders group by o_orderpriority
+---- PLAN
+PLAN-ROOT SINK
+|
+01:AGGREGATE [FINALIZE]
+|  output: sum(o_totalprice)
+|  group by: o_orderpriority
+|  row-size=28B cardinality=1.50M
+|
+00:SCAN DATA SOURCE [tpch_jdbc.orders]
+   row-size=0B cardinality=1.50M
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+04:EXCHANGE [UNPARTITIONED]
+|
+03:AGGREGATE [FINALIZE]
+|  output: sum:merge(o_totalprice)
+|  group by: o_orderpriority
+|  row-size=28B cardinality=1.50M
+|
+02:EXCHANGE [HASH(o_orderpriority)]
+|
+01:AGGREGATE [STREAMING]
+|  output: sum(o_totalprice)
+|  group by: o_orderpriority
+|  row-size=28B cardinality=1.50M
+|
+00:SCAN DATA SOURCE [tpch_jdbc.orders]
+   row-size=0B cardinality=1.50M
+====
+select c.c_custkey, sum(l.l_extendedprice) from
+tpch_jdbc.customer c join tpch_jdbc.orders o on
+c.c_custkey=o.o_custkey join tpch_jdbc.lineitem l on
+o.o_orderkey=l.l_orderkey where l.l_discount > 0.05
+group by c.c_custkey
+---- PLAN
+PLAN-ROOT SINK
+|
+05:AGGREGATE [FINALIZE]
+|  output: sum(l.l_extendedprice)
+|  group by: c.c_custkey
+|  row-size=24B cardinality=150.00K
+|
+04:HASH JOIN [INNER JOIN]
+|  hash predicates: o.o_orderkey = l.l_orderkey
+|  row-size=40B cardinality=150.00K
+|
+|--02:SCAN DATA SOURCE [tpch_jdbc.lineitem l]
+|     data source predicates: l.l_discount > 0.05
+|     row-size=0B cardinality=6.00M
+|
+03:HASH JOIN [INNER JOIN]
+|  hash predicates: c.c_custkey = o.o_custkey
+|  row-size=24B cardinality=150.00K
+|
+|--01:SCAN DATA SOURCE [tpch_jdbc.orders o]
+|     row-size=0B cardinality=1.50M
+|
+00:SCAN DATA SOURCE [tpch_jdbc.customer c]
+   row-size=0B cardinality=150.00K
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+10:EXCHANGE [UNPARTITIONED]
+|
+09:AGGREGATE [FINALIZE]
+|  output: sum:merge(l.l_extendedprice)
+|  group by: c.c_custkey
+|  row-size=24B cardinality=150.00K
+|
+08:EXCHANGE [HASH(c.c_custkey)]
+|
+05:AGGREGATE [STREAMING]
+|  output: sum(l.l_extendedprice)
+|  group by: c.c_custkey
+|  row-size=24B cardinality=150.00K
+|
+04:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: o.o_orderkey = l.l_orderkey
+|  row-size=40B cardinality=150.00K
+|
+|--07:EXCHANGE [BROADCAST]
+|  |
+|  02:SCAN DATA SOURCE [tpch_jdbc.lineitem l]
+|     data source predicates: l.l_discount > 0.05
+|     row-size=0B cardinality=6.00M
+|
+03:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: c.c_custkey = o.o_custkey
+|  row-size=24B cardinality=150.00K
+|
+|--06:EXCHANGE [BROADCAST]
+|  |
+|  01:SCAN DATA SOURCE [tpch_jdbc.orders o]
+|     row-size=0B cardinality=1.50M
+|
+00:SCAN DATA SOURCE [tpch_jdbc.customer c]
+   row-size=0B cardinality=150.00K
+====
+select c.c_name, o.o_orderstatus, count(*) from
+tpch_jdbc.customer c join tpch_jdbc.orders o on
+c.c_custkey=o.o_custkey group by c.c_name, o.o_orderstatus
+---- PLAN
+PLAN-ROOT SINK
+|
+03:AGGREGATE [FINALIZE]
+|  output: count(*)
+|  group by: c.c_name, o.o_orderstatus
+|  row-size=32B cardinality=150.00K
+|
+02:HASH JOIN [INNER JOIN]
+|  hash predicates: c.c_custkey = o.o_custkey
+|  row-size=40B cardinality=150.00K
+|
+|--01:SCAN DATA SOURCE [tpch_jdbc.orders o]
+|     row-size=0B cardinality=1.50M
+|
+00:SCAN DATA SOURCE [tpch_jdbc.customer c]
+   row-size=0B cardinality=150.00K
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+07:EXCHANGE [UNPARTITIONED]
+|
+06:AGGREGATE [FINALIZE]
+|  output: count:merge(*)
+|  group by: c.c_name, o.o_orderstatus
+|  row-size=32B cardinality=150.00K
+|
+05:EXCHANGE [HASH(c.c_name,o.o_orderstatus)]
+|
+03:AGGREGATE [STREAMING]
+|  output: count(*)
+|  group by: c.c_name, o.o_orderstatus
+|  row-size=32B cardinality=150.00K
+|
+02:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: c.c_custkey = o.o_custkey
+|  row-size=40B cardinality=150.00K
+|
+|--04:EXCHANGE [BROADCAST]
+|  |
+|  01:SCAN DATA SOURCE [tpch_jdbc.orders o]
+|     row-size=0B cardinality=1.50M
+|
+00:SCAN DATA SOURCE [tpch_jdbc.customer c]
+   row-size=0B cardinality=150.00K
+====
+select o_orderkey, max(l_shipdate), min(l_commitdate)
+from tpch_jdbc.orders o join tpch_jdbc.lineitem l on
+o.o_orderkey=l.l_orderkey group by o_orderkey
+---- PLAN
+PLAN-ROOT SINK
+|
+03:AGGREGATE [FINALIZE]
+|  output: max(l_shipdate), min(l_commitdate)
+|  group by: o_orderkey
+|  row-size=32B cardinality=1.50M
+|
+02:HASH JOIN [INNER JOIN]
+|  hash predicates: o.o_orderkey = l.l_orderkey
+|  row-size=40B cardinality=1.50M
+|
+|--01:SCAN DATA SOURCE [tpch_jdbc.lineitem l]
+|     row-size=0B cardinality=6.00M
+|
+00:SCAN DATA SOURCE [tpch_jdbc.orders o]
+   row-size=0B cardinality=1.50M
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+07:EXCHANGE [UNPARTITIONED]
+|
+06:AGGREGATE [FINALIZE]
+|  output: max:merge(l_shipdate), min:merge(l_commitdate)
+|  group by: o_orderkey
+|  row-size=32B cardinality=1.50M
+|
+05:EXCHANGE [HASH(o_orderkey)]
+|
+03:AGGREGATE [STREAMING]
+|  output: max(l_shipdate), min(l_commitdate)
+|  group by: o_orderkey
+|  row-size=32B cardinality=1.50M
+|
+02:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: o.o_orderkey = l.l_orderkey
+|  row-size=40B cardinality=1.50M
+|
+|--04:EXCHANGE [BROADCAST]
+|  |
+|  01:SCAN DATA SOURCE [tpch_jdbc.lineitem l]
+|     row-size=0B cardinality=6.00M
+|
+00:SCAN DATA SOURCE [tpch_jdbc.orders o]
+   row-size=0B cardinality=1.50M
+====
+select sum(o_totalprice), avg(o_totalprice) from tpch_jdbc.orders
+---- PLAN
+PLAN-ROOT SINK
+|
+01:AGGREGATE [FINALIZE]
+|  output: sum(o_totalprice), avg(o_totalprice)
+|  row-size=24B cardinality=1
+|
+00:SCAN DATA SOURCE [tpch_jdbc.orders]
+   row-size=0B cardinality=1.50M
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+03:AGGREGATE [FINALIZE]
+|  output: sum:merge(o_totalprice), avg:merge(o_totalprice)
+|  row-size=24B cardinality=1
+|
+02:EXCHANGE [UNPARTITIONED]
+|
+01:AGGREGATE
+|  output: sum(o_totalprice), avg(o_totalprice)
+|  row-size=24B cardinality=1
+|
+00:SCAN DATA SOURCE [tpch_jdbc.orders]
+   row-size=0B cardinality=1.50M
+====
+select c.c_custkey, c.c_name, sum(o.o_totalprice)
+from tpch_jdbc.customer c join tpch_jdbc.orders o on
+c.c_custkey=o.o_custkey group by c.c_custkey, c.c_name
+---- PLAN
+PLAN-ROOT SINK
+|
+03:AGGREGATE [FINALIZE]
+|  output: sum(o.o_totalprice)
+|  group by: c.c_custkey, c.c_name
+|  row-size=36B cardinality=150.00K
+|
+02:HASH JOIN [INNER JOIN]
+|  hash predicates: c.c_custkey = o.o_custkey
+|  row-size=36B cardinality=150.00K
+|
+|--01:SCAN DATA SOURCE [tpch_jdbc.orders o]
+|     row-size=0B cardinality=1.50M
+|
+00:SCAN DATA SOURCE [tpch_jdbc.customer c]
+   row-size=0B cardinality=150.00K
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+07:EXCHANGE [UNPARTITIONED]
+|
+06:AGGREGATE [FINALIZE]
+|  output: sum:merge(o.o_totalprice)
+|  group by: c.c_custkey, c.c_name
+|  row-size=36B cardinality=150.00K
+|
+05:EXCHANGE [HASH(c.c_custkey,c.c_name)]
+|
+03:AGGREGATE [STREAMING]
+|  output: sum(o.o_totalprice)
+|  group by: c.c_custkey, c.c_name
+|  row-size=36B cardinality=150.00K
+|
+02:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: c.c_custkey = o.o_custkey
+|  row-size=36B cardinality=150.00K
+|
+|--04:EXCHANGE [BROADCAST]
+|  |
+|  01:SCAN DATA SOURCE [tpch_jdbc.orders o]
+|     row-size=0B cardinality=1.50M
+|
+00:SCAN DATA SOURCE [tpch_jdbc.customer c]
+   row-size=0B cardinality=150.00K
+====
+select c.c_custkey, o.o_orderkey, l.l_linenumber,
+l.l_extendedprice from tpch_jdbc.customer c join
+tpch_jdbc.orders o on c.c_custkey=o.o_custkey join
+tpch_jdbc.lineitem l on o.o_orderkey=l.l_orderkey
+where l.l_shipdate > '1996-01-01'
+---- PLAN
+PLAN-ROOT SINK
+|
+04:HASH JOIN [INNER JOIN]
+|  hash predicates: o.o_orderkey = l.l_orderkey
+|  row-size=44B cardinality=150.00K
+|
+|--02:SCAN DATA SOURCE [tpch_jdbc.lineitem l]
+|     data source predicates: l.l_shipdate > '1996-01-01'
+|     row-size=0B cardinality=6.00M
+|
+03:HASH JOIN [INNER JOIN]
+|  hash predicates: c.c_custkey = o.o_custkey
+|  row-size=24B cardinality=150.00K
+|
+|--01:SCAN DATA SOURCE [tpch_jdbc.orders o]
+|     row-size=0B cardinality=1.50M
+|
+00:SCAN DATA SOURCE [tpch_jdbc.customer c]
+   row-size=0B cardinality=150.00K
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+07:EXCHANGE [UNPARTITIONED]
+|
+04:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: o.o_orderkey = l.l_orderkey
+|  row-size=44B cardinality=150.00K
+|
+|--06:EXCHANGE [BROADCAST]
+|  |
+|  02:SCAN DATA SOURCE [tpch_jdbc.lineitem l]
+|     data source predicates: l.l_shipdate > '1996-01-01'
+|     row-size=0B cardinality=6.00M
+|
+03:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: c.c_custkey = o.o_custkey
+|  row-size=24B cardinality=150.00K
+|
+|--05:EXCHANGE [BROADCAST]
+|  |
+|  01:SCAN DATA SOURCE [tpch_jdbc.orders o]
+|     row-size=0B cardinality=1.50M
+|
+00:SCAN DATA SOURCE [tpch_jdbc.customer c]
+   row-size=0B cardinality=150.00K
+====
+select * from tpch_jdbc.orders where o_orderdate between '1995-01-01' and 
'1996-01-01'
+---- PLAN
+PLAN-ROOT SINK
+|
+00:SCAN DATA SOURCE [tpch_jdbc.orders]
+   data source predicates: o_orderdate >= '1995-01-01', o_orderdate <= 
'1996-01-01'
+   row-size=88B cardinality=1.50M
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+01:EXCHANGE [UNPARTITIONED]
+|
+00:SCAN DATA SOURCE [tpch_jdbc.orders]
+   data source predicates: o_orderdate >= '1995-01-01', o_orderdate <= 
'1996-01-01'
+   row-size=88B cardinality=1.50M
+====
+
+select count(distinct c_custkey) from tpch_jdbc.customer
+---- PLAN
+PLAN-ROOT SINK
+|
+02:AGGREGATE [FINALIZE]
+|  output: count(c_custkey)
+|  row-size=8B cardinality=1
+|
+01:AGGREGATE
+|  group by: c_custkey
+|  row-size=8B cardinality=150.00K
+|
+00:SCAN DATA SOURCE [tpch_jdbc.customer]
+   row-size=0B cardinality=150.00K
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+06:AGGREGATE [FINALIZE]
+|  output: count:merge(c_custkey)
+|  row-size=8B cardinality=1
+|
+05:EXCHANGE [UNPARTITIONED]
+|
+02:AGGREGATE
+|  output: count(c_custkey)
+|  row-size=8B cardinality=1
+|
+04:AGGREGATE
+|  group by: c_custkey
+|  row-size=8B cardinality=150.00K
+|
+03:EXCHANGE [HASH(c_custkey)]
+|
+01:AGGREGATE [STREAMING]
+|  group by: c_custkey
+|  row-size=8B cardinality=150.00K
+|
+00:SCAN DATA SOURCE [tpch_jdbc.customer]
+   row-size=0B cardinality=150.00K
+====
+select l_returnflag, l_linestatus, sum(l_quantity),
+sum(l_extendedprice) from tpch_jdbc.lineitem group by
+l_returnflag, l_linestatus
+---- PLAN
+PLAN-ROOT SINK
+|
+01:AGGREGATE [FINALIZE]
+|  output: sum(l_quantity), sum(l_extendedprice)
+|  group by: l_returnflag, l_linestatus
+|  row-size=56B cardinality=6.00M
+|
+00:SCAN DATA SOURCE [tpch_jdbc.lineitem]
+   row-size=0B cardinality=6.00M
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+04:EXCHANGE [UNPARTITIONED]
+|
+03:AGGREGATE [FINALIZE]
+|  output: sum:merge(l_quantity), sum:merge(l_extendedprice)
+|  group by: l_returnflag, l_linestatus
+|  row-size=56B cardinality=6.00M
+|
+02:EXCHANGE [HASH(l_returnflag,l_linestatus)]
+|
+01:AGGREGATE [STREAMING]
+|  output: sum(l_quantity), sum(l_extendedprice)
+|  group by: l_returnflag, l_linestatus
+|  row-size=56B cardinality=6.00M
+|
+00:SCAN DATA SOURCE [tpch_jdbc.lineitem]
+   row-size=0B cardinality=6.00M
+====
+select o_orderpriority, count(*) from tpch_jdbc.orders
+where o_orderdate < '1995-03-15' group by o_orderpriority
+---- PLAN
+PLAN-ROOT SINK
+|
+01:AGGREGATE [FINALIZE]
+|  output: count(*)
+|  group by: o_orderpriority
+|  row-size=20B cardinality=1.50M
+|
+00:SCAN DATA SOURCE [tpch_jdbc.orders]
+   data source predicates: o_orderdate < '1995-03-15'
+   row-size=0B cardinality=1.50M
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+04:EXCHANGE [UNPARTITIONED]
+|
+03:AGGREGATE [FINALIZE]
+|  output: count:merge(*)
+|  group by: o_orderpriority
+|  row-size=20B cardinality=1.50M
+|
+02:EXCHANGE [HASH(o_orderpriority)]
+|
+01:AGGREGATE [STREAMING]
+|  output: count(*)
+|  group by: o_orderpriority
+|  row-size=20B cardinality=1.50M
+|
+00:SCAN DATA SOURCE [tpch_jdbc.orders]
+   data source predicates: o_orderdate < '1995-03-15'
+   row-size=0B cardinality=1.50M
+====
+select c.c_custkey, count(o.o_orderkey) from tpch_jdbc.customer
+c left join tpch_jdbc.orders o on c.c_custkey=o.o_custkey
+group by c.c_custkey
+---- PLAN
+PLAN-ROOT SINK
+|
+03:AGGREGATE [FINALIZE]
+|  output: count(o.o_orderkey)
+|  group by: c.c_custkey
+|  row-size=16B cardinality=150.00K
+|
+02:HASH JOIN [LEFT OUTER JOIN]
+|  hash predicates: c.c_custkey = o.o_custkey
+|  row-size=24B cardinality=150.00K
+|
+|--01:SCAN DATA SOURCE [tpch_jdbc.orders o]
+|     row-size=0B cardinality=1.50M
+|
+00:SCAN DATA SOURCE [tpch_jdbc.customer c]
+   row-size=0B cardinality=150.00K
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+07:EXCHANGE [UNPARTITIONED]
+|
+06:AGGREGATE [FINALIZE]
+|  output: count:merge(o.o_orderkey)
+|  group by: c.c_custkey
+|  row-size=16B cardinality=150.00K
+|
+05:EXCHANGE [HASH(c.c_custkey)]
+|
+03:AGGREGATE [STREAMING]
+|  output: count(o.o_orderkey)
+|  group by: c.c_custkey
+|  row-size=16B cardinality=150.00K
+|
+02:HASH JOIN [LEFT OUTER JOIN, BROADCAST]
+|  hash predicates: c.c_custkey = o.o_custkey
+|  row-size=24B cardinality=150.00K
+|
+|--04:EXCHANGE [BROADCAST]
+|  |
+|  01:SCAN DATA SOURCE [tpch_jdbc.orders o]
+|     row-size=0B cardinality=1.50M
+|
+00:SCAN DATA SOURCE [tpch_jdbc.customer c]
+   row-size=0B cardinality=150.00K
+====
+select c.c_custkey, o.o_orderkey, l.l_extendedprice from
+tpch_jdbc.customer c join tpch_jdbc.orders o on
+c.c_custkey=o.o_custkey join tpch_jdbc.lineitem l on
+o.o_orderkey=l.l_orderkey where l.l_discount < 0.07
+---- PLAN
+PLAN-ROOT SINK
+|
+04:HASH JOIN [INNER JOIN]
+|  hash predicates: o.o_orderkey = l.l_orderkey
+|  row-size=40B cardinality=150.00K
+|
+|--02:SCAN DATA SOURCE [tpch_jdbc.lineitem l]
+|     data source predicates: l.l_discount < 0.07
+|     row-size=0B cardinality=6.00M
+|
+03:HASH JOIN [INNER JOIN]
+|  hash predicates: c.c_custkey = o.o_custkey
+|  row-size=24B cardinality=150.00K
+|
+|--01:SCAN DATA SOURCE [tpch_jdbc.orders o]
+|     row-size=0B cardinality=1.50M
+|
+00:SCAN DATA SOURCE [tpch_jdbc.customer c]
+   row-size=0B cardinality=150.00K
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+07:EXCHANGE [UNPARTITIONED]
+|
+04:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: o.o_orderkey = l.l_orderkey
+|  row-size=40B cardinality=150.00K
+|
+|--06:EXCHANGE [BROADCAST]
+|  |
+|  02:SCAN DATA SOURCE [tpch_jdbc.lineitem l]
+|     data source predicates: l.l_discount < 0.07
+|     row-size=0B cardinality=6.00M
+|
+03:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: c.c_custkey = o.o_custkey
+|  row-size=24B cardinality=150.00K
+|
+|--05:EXCHANGE [BROADCAST]
+|  |
+|  01:SCAN DATA SOURCE [tpch_jdbc.orders o]
+|     row-size=0B cardinality=1.50M
+|
+00:SCAN DATA SOURCE [tpch_jdbc.customer c]
+   row-size=0B cardinality=150.00K
+====
+select c.c_name, sum(o.o_totalprice), sum(l.l_extendedprice)
+from tpch_jdbc.customer c join tpch_jdbc.orders
+o on c.c_custkey=o.o_custkey join tpch_jdbc.lineitem
+l on o.o_orderkey=l.l_orderkey group by c.c_name
+---- PLAN
+PLAN-ROOT SINK
+|
+05:AGGREGATE [FINALIZE]
+|  output: sum(o.o_totalprice), sum(l.l_extendedprice)
+|  group by: c.c_name
+|  row-size=44B cardinality=150.00K
+|
+04:HASH JOIN [INNER JOIN]
+|  hash predicates: o.o_orderkey = l.l_orderkey
+|  row-size=60B cardinality=150.00K
+|
+|--02:SCAN DATA SOURCE [tpch_jdbc.lineitem l]
+|     row-size=0B cardinality=6.00M
+|
+03:HASH JOIN [INNER JOIN]
+|  hash predicates: c.c_custkey = o.o_custkey
+|  row-size=44B cardinality=150.00K
+|
+|--01:SCAN DATA SOURCE [tpch_jdbc.orders o]
+|     row-size=0B cardinality=1.50M
+|
+00:SCAN DATA SOURCE [tpch_jdbc.customer c]
+   row-size=0B cardinality=150.00K
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+10:EXCHANGE [UNPARTITIONED]
+|
+09:AGGREGATE [FINALIZE]
+|  output: sum:merge(o.o_totalprice), sum:merge(l.l_extendedprice)
+|  group by: c.c_name
+|  row-size=44B cardinality=150.00K
+|
+08:EXCHANGE [HASH(c.c_name)]
+|
+05:AGGREGATE [STREAMING]
+|  output: sum(o.o_totalprice), sum(l.l_extendedprice)
+|  group by: c.c_name
+|  row-size=44B cardinality=150.00K
+|
+04:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: o.o_orderkey = l.l_orderkey
+|  row-size=60B cardinality=150.00K
+|
+|--07:EXCHANGE [BROADCAST]
+|  |
+|  02:SCAN DATA SOURCE [tpch_jdbc.lineitem l]
+|     row-size=0B cardinality=6.00M
+|
+03:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: c.c_custkey = o.o_custkey
+|  row-size=44B cardinality=150.00K
+|
+|--06:EXCHANGE [BROADCAST]
+|  |
+|  01:SCAN DATA SOURCE [tpch_jdbc.orders o]
+|     row-size=0B cardinality=1.50M
+|
+00:SCAN DATA SOURCE [tpch_jdbc.customer c]
+   row-size=0B cardinality=150.00K
+====
+select l.l_orderkey, avg(l.l_quantity) from
+tpch_jdbc.lineitem l group by l.l_orderkey
+having avg(l.l_quantity) > 20
+---- PLAN
+PLAN-ROOT SINK
+|
+01:AGGREGATE [FINALIZE]
+|  output: avg(l.l_quantity)
+|  group by: l.l_orderkey
+|  having: avg(l.l_quantity) > 20
+|  row-size=16B cardinality=600.12K
+|
+00:SCAN DATA SOURCE [tpch_jdbc.lineitem l]
+   row-size=0B cardinality=6.00M
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+04:EXCHANGE [UNPARTITIONED]
+|
+03:AGGREGATE [FINALIZE]
+|  output: avg:merge(l.l_quantity)
+|  group by: l.l_orderkey
+|  having: avg(l.l_quantity) > 20
+|  row-size=16B cardinality=600.12K
+|
+02:EXCHANGE [HASH(l.l_orderkey)]
+|
+01:AGGREGATE [STREAMING]
+|  output: avg(l.l_quantity)
+|  group by: l.l_orderkey
+|  row-size=16B cardinality=6.00M
+|
+00:SCAN DATA SOURCE [tpch_jdbc.lineitem l]
+   row-size=0B cardinality=6.00M
+====
+select o.o_orderkey, count(l.l_partkey) from
+tpch_jdbc.orders o join tpch_jdbc.lineitem l on
+o.o_orderkey=l.l_orderkey group by o.o_orderkey
+---- PLAN
+PLAN-ROOT SINK
+|
+03:AGGREGATE [FINALIZE]
+|  output: count(l.l_partkey)
+|  group by: o.o_orderkey
+|  row-size=16B cardinality=1.50M
+|
+02:HASH JOIN [INNER JOIN]
+|  hash predicates: o.o_orderkey = l.l_orderkey
+|  row-size=24B cardinality=1.50M
+|
+|--01:SCAN DATA SOURCE [tpch_jdbc.lineitem l]
+|     row-size=0B cardinality=6.00M
+|
+00:SCAN DATA SOURCE [tpch_jdbc.orders o]
+   row-size=0B cardinality=1.50M
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+07:EXCHANGE [UNPARTITIONED]
+|
+06:AGGREGATE [FINALIZE]
+|  output: count:merge(l.l_partkey)
+|  group by: o.o_orderkey
+|  row-size=16B cardinality=1.50M
+|
+05:EXCHANGE [HASH(o.o_orderkey)]
+|
+03:AGGREGATE [STREAMING]
+|  output: count(l.l_partkey)
+|  group by: o.o_orderkey
+|  row-size=16B cardinality=1.50M
+|
+02:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: o.o_orderkey = l.l_orderkey
+|  row-size=24B cardinality=1.50M
+|
+|--04:EXCHANGE [BROADCAST]
+|  |
+|  01:SCAN DATA SOURCE [tpch_jdbc.lineitem l]
+|     row-size=0B cardinality=6.00M
+|
+00:SCAN DATA SOURCE [tpch_jdbc.orders o]
+   row-size=0B cardinality=1.50M
+====
+select n.n_name, count(c.c_custkey) from tpch_jdbc.nation n
+join tpch_jdbc.customer c on n.n_nationkey=c.c_nationkey group by n.n_name
+---- PLAN
+PLAN-ROOT SINK
+|
+03:AGGREGATE [FINALIZE]
+|  output: count(c.c_custkey)
+|  group by: n.n_name
+|  row-size=20B cardinality=25
+|
+02:HASH JOIN [INNER JOIN]
+|  hash predicates: n.n_nationkey = c.c_nationkey
+|  row-size=24B cardinality=25
+|
+|--01:SCAN DATA SOURCE [tpch_jdbc.customer c]
+|     row-size=0B cardinality=150.00K
+|
+00:SCAN DATA SOURCE [tpch_jdbc.nation n]
+   row-size=0B cardinality=25
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+07:EXCHANGE [UNPARTITIONED]
+|
+06:AGGREGATE [FINALIZE]
+|  output: count:merge(c.c_custkey)
+|  group by: n.n_name
+|  row-size=20B cardinality=25
+|
+05:EXCHANGE [HASH(n.n_name)]
+|
+03:AGGREGATE [STREAMING]
+|  output: count(c.c_custkey)
+|  group by: n.n_name
+|  row-size=20B cardinality=25
+|
+02:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: n.n_nationkey = c.c_nationkey
+|  row-size=24B cardinality=25
+|
+|--04:EXCHANGE [BROADCAST]
+|  |
+|  01:SCAN DATA SOURCE [tpch_jdbc.customer c]
+|     row-size=0B cardinality=150.00K
+|
+00:SCAN DATA SOURCE [tpch_jdbc.nation n]
+   row-size=0B cardinality=25
+====
+select r.r_name, count(n.n_name) from tpch_jdbc.region r
+join tpch_jdbc.nation n on r.r_regionkey=n.n_regionkey
+group by r.r_name
+---- PLAN
+PLAN-ROOT SINK
+|
+03:AGGREGATE [FINALIZE]
+|  output: count(n.n_name)
+|  group by: r.r_name
+|  row-size=20B cardinality=10
+|
+02:HASH JOIN [INNER JOIN]
+|  hash predicates: r.r_regionkey = n.n_regionkey
+|  row-size=28B cardinality=10
+|
+|--01:SCAN DATA SOURCE [tpch_jdbc.nation n]
+|     row-size=0B cardinality=25
+|
+00:SCAN DATA SOURCE [tpch_jdbc.region r]
+   row-size=0B cardinality=10
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+07:EXCHANGE [UNPARTITIONED]
+|
+06:AGGREGATE [FINALIZE]
+|  output: count:merge(n.n_name)
+|  group by: r.r_name
+|  row-size=20B cardinality=10
+|
+05:EXCHANGE [HASH(r.r_name)]
+|
+03:AGGREGATE [STREAMING]
+|  output: count(n.n_name)
+|  group by: r.r_name
+|  row-size=20B cardinality=10
+|
+02:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: r.r_regionkey = n.n_regionkey
+|  row-size=28B cardinality=10
+|
+|--04:EXCHANGE [BROADCAST]
+|  |
+|  01:SCAN DATA SOURCE [tpch_jdbc.nation n]
+|     row-size=0B cardinality=25
+|
+00:SCAN DATA SOURCE [tpch_jdbc.region r]
+   row-size=0B cardinality=10
+====
+select c.c_name, o.o_orderdate, l.l_shipdate from
+tpch_jdbc.customer c join tpch_jdbc.orders o on
+c.c_custkey=o.o_custkey join tpch_jdbc.lineitem l
+on o.o_orderkey=l.l_orderkey where o.o_orderdate > '1993-01-01'
+---- PLAN
+PLAN-ROOT SINK
+|
+04:HASH JOIN [INNER JOIN]
+|  hash predicates: o.o_orderkey = l.l_orderkey
+|  row-size=68B cardinality=150.00K
+|
+|--02:SCAN DATA SOURCE [tpch_jdbc.lineitem l]
+|     row-size=0B cardinality=6.00M
+|
+03:HASH JOIN [INNER JOIN]
+|  hash predicates: c.c_custkey = o.o_custkey
+|  row-size=48B cardinality=150.00K
+|
+|--01:SCAN DATA SOURCE [tpch_jdbc.orders o]
+|     data source predicates: o.o_orderdate > '1993-01-01'
+|     row-size=0B cardinality=1.50M
+|
+00:SCAN DATA SOURCE [tpch_jdbc.customer c]
+   row-size=0B cardinality=150.00K
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+07:EXCHANGE [UNPARTITIONED]
+|
+04:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: o.o_orderkey = l.l_orderkey
+|  row-size=68B cardinality=150.00K
+|
+|--06:EXCHANGE [BROADCAST]
+|  |
+|  02:SCAN DATA SOURCE [tpch_jdbc.lineitem l]
+|     row-size=0B cardinality=6.00M
+|
+03:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: c.c_custkey = o.o_custkey
+|  row-size=48B cardinality=150.00K
+|
+|--05:EXCHANGE [BROADCAST]
+|  |
+|  01:SCAN DATA SOURCE [tpch_jdbc.orders o]
+|     data source predicates: o.o_orderdate > '1993-01-01'
+|     row-size=0B cardinality=1.50M
+|
+00:SCAN DATA SOURCE [tpch_jdbc.customer c]
+   row-size=0B cardinality=150.00K
+====
+select c.c_name, count(o.o_orderkey), sum(o.o_totalprice) from
+tpch_jdbc.customer c join tpch_jdbc.orders o on
+c.c_custkey=o.o_custkey group by c.c_name
+---- PLAN
+PLAN-ROOT SINK
+|
+03:AGGREGATE [FINALIZE]
+|  output: count(o.o_orderkey), sum(o.o_totalprice)
+|  group by: c.c_name
+|  row-size=36B cardinality=150.00K
+|
+02:HASH JOIN [INNER JOIN]
+|  hash predicates: c.c_custkey = o.o_custkey
+|  row-size=44B cardinality=150.00K
+|
+|--01:SCAN DATA SOURCE [tpch_jdbc.orders o]
+|     row-size=0B cardinality=1.50M
+|
+00:SCAN DATA SOURCE [tpch_jdbc.customer c]
+   row-size=0B cardinality=150.00K
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+07:EXCHANGE [UNPARTITIONED]
+|
+06:AGGREGATE [FINALIZE]
+|  output: count:merge(o.o_orderkey), sum:merge(o.o_totalprice)
+|  group by: c.c_name
+|  row-size=36B cardinality=150.00K
+|
+05:EXCHANGE [HASH(c.c_name)]
+|
+03:AGGREGATE [STREAMING]
+|  output: count(o.o_orderkey), sum(o.o_totalprice)
+|  group by: c.c_name
+|  row-size=36B cardinality=150.00K
+|
+02:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: c.c_custkey = o.o_custkey
+|  row-size=44B cardinality=150.00K
+|
+|--04:EXCHANGE [BROADCAST]
+|  |
+|  01:SCAN DATA SOURCE [tpch_jdbc.orders o]
+|     row-size=0B cardinality=1.50M
+|
+00:SCAN DATA SOURCE [tpch_jdbc.customer c]
+   row-size=0B cardinality=150.00K
+====
+select o_orderstatus, avg(o_totalprice) from tpch_jdbc.orders
+group by o_orderstatus
+---- PLAN
+PLAN-ROOT SINK
+|
+01:AGGREGATE [FINALIZE]
+|  output: avg(o_totalprice)
+|  group by: o_orderstatus
+|  row-size=20B cardinality=1.50M
+|
+00:SCAN DATA SOURCE [tpch_jdbc.orders]
+   row-size=0B cardinality=1.50M
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+04:EXCHANGE [UNPARTITIONED]
+|
+03:AGGREGATE [FINALIZE]
+|  output: avg:merge(o_totalprice)
+|  group by: o_orderstatus
+|  row-size=20B cardinality=1.50M
+|
+02:EXCHANGE [HASH(o_orderstatus)]
+|
+01:AGGREGATE [STREAMING]
+|  output: avg(o_totalprice)
+|  group by: o_orderstatus
+|  row-size=20B cardinality=1.50M
+|
+00:SCAN DATA SOURCE [tpch_jdbc.orders]
+   row-size=0B cardinality=1.50M
+====
+select c.c_name, o.o_orderstatus, l.l_returnflag from
+tpch_jdbc.customer c join tpch_jdbc.orders o on
+c.c_custkey=o.o_custkey join tpch_jdbc.lineitem l
+on o.o_orderkey=l.l_orderkey
+---- PLAN
+PLAN-ROOT SINK
+|
+04:HASH JOIN [INNER JOIN]
+|  hash predicates: o.o_orderkey = l.l_orderkey
+|  row-size=68B cardinality=150.00K
+|
+|--02:SCAN DATA SOURCE [tpch_jdbc.lineitem l]
+|     row-size=0B cardinality=6.00M
+|
+03:HASH JOIN [INNER JOIN]
+|  hash predicates: c.c_custkey = o.o_custkey
+|  row-size=48B cardinality=150.00K
+|
+|--01:SCAN DATA SOURCE [tpch_jdbc.orders o]
+|     row-size=0B cardinality=1.50M
+|
+00:SCAN DATA SOURCE [tpch_jdbc.customer c]
+   row-size=0B cardinality=150.00K
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+07:EXCHANGE [UNPARTITIONED]
+|
+04:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: o.o_orderkey = l.l_orderkey
+|  row-size=68B cardinality=150.00K
+|
+|--06:EXCHANGE [BROADCAST]
+|  |
+|  02:SCAN DATA SOURCE [tpch_jdbc.lineitem l]
+|     row-size=0B cardinality=6.00M
+|
+03:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: c.c_custkey = o.o_custkey
+|  row-size=48B cardinality=150.00K
+|
+|--05:EXCHANGE [BROADCAST]
+|  |
+|  01:SCAN DATA SOURCE [tpch_jdbc.orders o]
+|     row-size=0B cardinality=1.50M
+|
+00:SCAN DATA SOURCE [tpch_jdbc.customer c]
+   row-size=0B cardinality=150.00K
+====
+select c.c_custkey, o.o_orderpriority, l.l_quantity from
+tpch_jdbc.customer c join tpch_jdbc.orders o on
+c.c_custkey=o.o_custkey join tpch_jdbc.lineitem l on
+o.o_orderkey=l.l_orderkey where l.l_shipdate < l.l_commitdate
+---- PLAN
+PLAN-ROOT SINK
+|
+04:HASH JOIN [INNER JOIN]
+|  hash predicates: o.o_orderkey = l.l_orderkey
+|  row-size=76B cardinality=150.00K
+|
+|--02:SCAN DATA SOURCE [tpch_jdbc.lineitem l]
+|     predicates: l.l_shipdate < l.l_commitdate
+|     row-size=0B cardinality=600.12K
+|
+03:HASH JOIN [INNER JOIN]
+|  hash predicates: c.c_custkey = o.o_custkey
+|  row-size=36B cardinality=150.00K
+|
+|--01:SCAN DATA SOURCE [tpch_jdbc.orders o]
+|     row-size=0B cardinality=1.50M
+|
+00:SCAN DATA SOURCE [tpch_jdbc.customer c]
+   row-size=0B cardinality=150.00K
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+07:EXCHANGE [UNPARTITIONED]
+|
+04:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: o.o_orderkey = l.l_orderkey
+|  row-size=76B cardinality=150.00K
+|
+|--06:EXCHANGE [BROADCAST]
+|  |
+|  02:SCAN DATA SOURCE [tpch_jdbc.lineitem l]
+|     predicates: l.l_shipdate < l.l_commitdate
+|     row-size=0B cardinality=600.12K
+|
+03:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: c.c_custkey = o.o_custkey
+|  row-size=36B cardinality=150.00K
+|
+|--05:EXCHANGE [BROADCAST]
+|  |
+|  01:SCAN DATA SOURCE [tpch_jdbc.orders o]
+|     row-size=0B cardinality=1.50M
+|
+00:SCAN DATA SOURCE [tpch_jdbc.customer c]
+   row-size=0B cardinality=150.00K
+====
+select c.c_name, sum(l.l_discount) from tpch_jdbc.customer c join
+tpch_jdbc.orders o on c.c_custkey=o.o_custkey join tpch_jdbc.lineitem
+l on o.o_orderkey=l.l_orderkey group by c.c_name
+---- PLAN
+PLAN-ROOT SINK
+|
+05:AGGREGATE [FINALIZE]
+|  output: sum(l.l_discount)
+|  group by: c.c_name
+|  row-size=28B cardinality=150.00K
+|
+04:HASH JOIN [INNER JOIN]
+|  hash predicates: o.o_orderkey = l.l_orderkey
+|  row-size=52B cardinality=150.00K
+|
+|--02:SCAN DATA SOURCE [tpch_jdbc.lineitem l]
+|     row-size=0B cardinality=6.00M
+|
+03:HASH JOIN [INNER JOIN]
+|  hash predicates: c.c_custkey = o.o_custkey
+|  row-size=36B cardinality=150.00K
+|
+|--01:SCAN DATA SOURCE [tpch_jdbc.orders o]
+|     row-size=0B cardinality=1.50M
+|
+00:SCAN DATA SOURCE [tpch_jdbc.customer c]
+   row-size=0B cardinality=150.00K
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+10:EXCHANGE [UNPARTITIONED]
+|
+09:AGGREGATE [FINALIZE]
+|  output: sum:merge(l.l_discount)
+|  group by: c.c_name
+|  row-size=28B cardinality=150.00K
+|
+08:EXCHANGE [HASH(c.c_name)]
+|
+05:AGGREGATE [STREAMING]
+|  output: sum(l.l_discount)
+|  group by: c.c_name
+|  row-size=28B cardinality=150.00K
+|
+04:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: o.o_orderkey = l.l_orderkey
+|  row-size=52B cardinality=150.00K
+|
+|--07:EXCHANGE [BROADCAST]
+|  |
+|  02:SCAN DATA SOURCE [tpch_jdbc.lineitem l]
+|     row-size=0B cardinality=6.00M
+|
+03:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: c.c_custkey = o.o_custkey
+|  row-size=36B cardinality=150.00K
+|
+|--06:EXCHANGE [BROADCAST]
+|  |
+|  01:SCAN DATA SOURCE [tpch_jdbc.orders o]
+|     row-size=0B cardinality=1.50M
+|
+00:SCAN DATA SOURCE [tpch_jdbc.customer c]
+   row-size=0B cardinality=150.00K
+====
+select c.c_name, count(distinct o.o_orderkey) from tpch_jdbc.customer
+c join tpch_jdbc.orders o on c.c_custkey=o.o_custkey group by c.c_name
+---- PLAN
+PLAN-ROOT SINK
+|
+04:AGGREGATE [FINALIZE]
+|  output: count(o.o_orderkey)
+|  group by: c.c_name
+|  row-size=20B cardinality=150.00K
+|
+03:AGGREGATE
+|  group by: c.c_name, o.o_orderkey
+|  row-size=20B cardinality=150.00K
+|
+02:HASH JOIN [INNER JOIN]
+|  hash predicates: c.c_custkey = o.o_custkey
+|  row-size=36B cardinality=150.00K
+|
+|--01:SCAN DATA SOURCE [tpch_jdbc.orders o]
+|     row-size=0B cardinality=1.50M
+|
+00:SCAN DATA SOURCE [tpch_jdbc.customer c]
+   row-size=0B cardinality=150.00K
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+10:EXCHANGE [UNPARTITIONED]
+|
+09:AGGREGATE [FINALIZE]
+|  output: count:merge(o.o_orderkey)
+|  group by: c.c_name
+|  row-size=20B cardinality=150.00K
+|
+08:EXCHANGE [HASH(c.c_name)]
+|
+04:AGGREGATE [STREAMING]
+|  output: count(o.o_orderkey)
+|  group by: c.c_name
+|  row-size=20B cardinality=150.00K
+|
+07:AGGREGATE
+|  group by: c.c_name, o.o_orderkey
+|  row-size=20B cardinality=150.00K
+|
+06:EXCHANGE [HASH(c.c_name,o.o_orderkey)]
+|
+03:AGGREGATE [STREAMING]
+|  group by: c.c_name, o.o_orderkey
+|  row-size=20B cardinality=150.00K
+|
+02:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: c.c_custkey = o.o_custkey
+|  row-size=36B cardinality=150.00K
+|
+|--05:EXCHANGE [BROADCAST]
+|  |
+|  01:SCAN DATA SOURCE [tpch_jdbc.orders o]
+|     row-size=0B cardinality=1.50M
+|
+00:SCAN DATA SOURCE [tpch_jdbc.customer c]
+   row-size=0B cardinality=150.00K
+====
+SELECT a.c_custkey
+FROM tpch_jdbc.customer a
+LEFT JOIN tpch_jdbc.orders b ON a.c_custkey = b.o_custkey
+LEFT JOIN tpch_jdbc.orders c ON b.o_custkey = c.o_custkey
+LEFT JOIN tpch_jdbc.customer d ON c.o_custkey = d.c_custkey
+LEFT JOIN tpch_jdbc.orders e ON d.c_custkey = e.o_custkey
+LEFT JOIN tpch_jdbc.customer f ON e.o_custkey = f.c_custkey
+ORDER BY a.c_custkey;
+---- PLAN
+PLAN-ROOT SINK
+|
+11:SORT
+|  order by: c_custkey ASC
+|  row-size=8B cardinality=150.00K
+|
+10:HASH JOIN [LEFT OUTER JOIN]
+|  hash predicates: e.o_custkey = f.c_custkey
+|  row-size=48B cardinality=150.00K
+|
+|--05:SCAN DATA SOURCE [tpch_jdbc.customer f]
+|     row-size=0B cardinality=150.00K
+|
+09:HASH JOIN [LEFT OUTER JOIN]
+|  hash predicates: d.c_custkey = e.o_custkey
+|  row-size=40B cardinality=150.00K
+|
+|--04:SCAN DATA SOURCE [tpch_jdbc.orders e]
+|     row-size=0B cardinality=1.50M
+|
+08:HASH JOIN [LEFT OUTER JOIN]
+|  hash predicates: c.o_custkey = d.c_custkey
+|  row-size=32B cardinality=150.00K
+|
+|--03:SCAN DATA SOURCE [tpch_jdbc.customer d]
+|     row-size=0B cardinality=150.00K
+|
+07:HASH JOIN [LEFT OUTER JOIN]
+|  hash predicates: b.o_custkey = c.o_custkey
+|  row-size=24B cardinality=150.00K
+|
+|--02:SCAN DATA SOURCE [tpch_jdbc.orders c]
+|     row-size=0B cardinality=1.50M
+|
+06:HASH JOIN [LEFT OUTER JOIN]
+|  hash predicates: a.c_custkey = b.o_custkey
+|  row-size=16B cardinality=150.00K
+|
+|--01:SCAN DATA SOURCE [tpch_jdbc.orders b]
+|     row-size=0B cardinality=1.50M
+|
+00:SCAN DATA SOURCE [tpch_jdbc.customer a]
+   row-size=0B cardinality=150.00K
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+17:MERGING-EXCHANGE [UNPARTITIONED]
+|  order by: c_custkey ASC
+|
+11:SORT
+|  order by: c_custkey ASC
+|  row-size=8B cardinality=150.00K
+|
+10:HASH JOIN [LEFT OUTER JOIN, BROADCAST]
+|  hash predicates: e.o_custkey = f.c_custkey
+|  row-size=48B cardinality=150.00K
+|
+|--16:EXCHANGE [BROADCAST]
+|  |
+|  05:SCAN DATA SOURCE [tpch_jdbc.customer f]
+|     row-size=0B cardinality=150.00K
+|
+09:HASH JOIN [LEFT OUTER JOIN, BROADCAST]
+|  hash predicates: d.c_custkey = e.o_custkey
+|  row-size=40B cardinality=150.00K
+|
+|--15:EXCHANGE [BROADCAST]
+|  |
+|  04:SCAN DATA SOURCE [tpch_jdbc.orders e]
+|     row-size=0B cardinality=1.50M
+|
+08:HASH JOIN [LEFT OUTER JOIN, BROADCAST]
+|  hash predicates: c.o_custkey = d.c_custkey
+|  row-size=32B cardinality=150.00K
+|
+|--14:EXCHANGE [BROADCAST]
+|  |
+|  03:SCAN DATA SOURCE [tpch_jdbc.customer d]
+|     row-size=0B cardinality=150.00K
+|
+07:HASH JOIN [LEFT OUTER JOIN, BROADCAST]
+|  hash predicates: b.o_custkey = c.o_custkey
+|  row-size=24B cardinality=150.00K
+|
+|--13:EXCHANGE [BROADCAST]
+|  |
+|  02:SCAN DATA SOURCE [tpch_jdbc.orders c]
+|     row-size=0B cardinality=1.50M
+|
+06:HASH JOIN [LEFT OUTER JOIN, BROADCAST]
+|  hash predicates: a.c_custkey = b.o_custkey
+|  row-size=16B cardinality=150.00K
+|
+|--12:EXCHANGE [BROADCAST]
+|  |
+|  01:SCAN DATA SOURCE [tpch_jdbc.orders b]
+|     row-size=0B cardinality=1.50M
+|
+00:SCAN DATA SOURCE [tpch_jdbc.customer a]
+   row-size=0B cardinality=150.00K
+====
+WITH recent_orders AS (
+  SELECT o.o_orderkey, o.o_custkey, o.o_totalprice, o.o_orderdate
+  FROM tpch_jdbc.orders o WHERE o.o_orderdate >= '1995-01-01'
+)
+SELECT c.c_custkey, c.c_name, c.c_mktsegment,
+       COUNT(ro.o_orderkey) AS num_orders,
+       SUM(ro.o_totalprice) AS total_spent,
+       AVG(ro.o_totalprice) AS avg_order_value,
+       RANK() OVER (ORDER BY SUM(ro.o_totalprice) DESC) AS spend_rank,
+       CASE
+         WHEN SUM(ro.o_totalprice) > 500000 THEN 'VIP'
+         WHEN SUM(ro.o_totalprice) > 100000 THEN 'Premium'
+         ELSE 'Regular'
+       END AS customer_tier,
+       s.s_name AS supplier_name,
+       s.s_acctbal AS supplier_balance
+FROM tpch_jdbc.customer c
+JOIN recent_orders ro
+     ON c.c_custkey = ro.o_custkey
+JOIN tpch_jdbc.supplier s
+     ON c.c_nationkey = s.s_nationkey
+GROUP BY c.c_custkey, c.c_name, c.c_mktsegment, s.s_name, s.s_acctbal
+ORDER BY total_spent DESC
+LIMIT 5;
+---- PLAN
+PLAN-ROOT SINK
+|
+08:TOP-N [LIMIT=5]
+|  order by: sum(ro.o_totalprice) DESC
+|  row-size=92B cardinality=5
+|
+07:ANALYTIC
+|  functions: rank()
+|  order by: sum(ro.o_totalprice) DESC
+|  window: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
+|  row-size=92B cardinality=1.50M
+|
+06:SORT
+|  order by: sum(ro.o_totalprice) DESC
+|  row-size=84B cardinality=1.50M
+|
+05:AGGREGATE [FINALIZE]
+|  output: count(o.o_orderkey), sum(o.o_totalprice), avg(o.o_totalprice)
+|  group by: c.c_custkey, c.c_name, c.c_mktsegment, s.s_name, s.s_acctbal
+|  row-size=84B cardinality=1.50M
+|
+04:HASH JOIN [INNER JOIN]
+|  hash predicates: c.c_nationkey = s.s_nationkey
+|  row-size=80B cardinality=1.50M
+|
+|--02:SCAN DATA SOURCE [tpch_jdbc.supplier s]
+|     row-size=0B cardinality=10.00K
+|
+03:HASH JOIN [INNER JOIN]
+|  hash predicates: o.o_custkey = c.c_custkey
+|  row-size=58B cardinality=1.50M
+|
+|--00:SCAN DATA SOURCE [tpch_jdbc.customer c]
+|     row-size=0B cardinality=150.00K
+|
+01:SCAN DATA SOURCE [tpch_jdbc.orders o]
+   data source predicates: o.o_orderdate >= '1995-01-01'
+   row-size=24B cardinality=1.50M
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+08:TOP-N [LIMIT=5]
+|  order by: sum(ro.o_totalprice) DESC
+|  row-size=92B cardinality=5
+|
+07:ANALYTIC
+|  functions: rank()
+|  order by: sum(ro.o_totalprice) DESC
+|  window: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
+|  row-size=92B cardinality=1.50M
+|
+13:MERGING-EXCHANGE [UNPARTITIONED]
+|  order by: sum(ro.o_totalprice) DESC
+|
+06:SORT
+|  order by: sum(ro.o_totalprice) DESC
+|  row-size=84B cardinality=1.50M
+|
+12:AGGREGATE [FINALIZE]
+|  output: count:merge(ro.o_orderkey), sum:merge(ro.o_totalprice), 
avg:merge(ro.o_totalprice)
+|  group by: c.c_custkey, c.c_name, c.c_mktsegment, s.s_name, s.s_acctbal
+|  row-size=84B cardinality=1.50M
+|
+11:EXCHANGE [HASH(c.c_custkey,c.c_name,c.c_mktsegment,s.s_name,s.s_acctbal)]
+|
+05:AGGREGATE [STREAMING]
+|  output: count(o.o_orderkey), sum(o.o_totalprice), avg(o.o_totalprice)
+|  group by: c.c_custkey, c.c_name, c.c_mktsegment, s.s_name, s.s_acctbal
+|  row-size=84B cardinality=1.50M
+|
+04:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: c.c_nationkey = s.s_nationkey
+|  row-size=80B cardinality=1.50M
+|
+|--10:EXCHANGE [BROADCAST]
+|  |
+|  02:SCAN DATA SOURCE [tpch_jdbc.supplier s]
+|     row-size=0B cardinality=10.00K
+|
+03:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: o.o_custkey = c.c_custkey
+|  row-size=58B cardinality=1.50M
+|
+|--09:EXCHANGE [BROADCAST]
+|  |
+|  00:SCAN DATA SOURCE [tpch_jdbc.customer c]
+|     row-size=0B cardinality=150.00K
+|
+01:SCAN DATA SOURCE [tpch_jdbc.orders o]
+   data source predicates: o.o_orderdate >= '1995-01-01'
+   row-size=24B cardinality=1.50M
+====
\ No newline at end of file

Reply via email to