This is an automated email from the ASF dual-hosted git repository.
michaelsmith pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new 9071030f7 IMPALA-12809: Iceberg metadata table scanner should always
be scheduled to the coordinator
9071030f7 is described below
commit 9071030f7fc272520c26ddb793551987226a5693
Author: Daniel Becker <[email protected]>
AuthorDate: Tue Mar 12 15:12:35 2024 +0100
IMPALA-12809: Iceberg metadata table scanner should always be scheduled to
the coordinator
On clusters with dedicated coordinators and executors the Iceberg
metadata scanner fragment(s) can be scheduled to executors, for example
during a join. The fragment in this case will fail a precondition check,
because either the 'frontend_' object or the table will not be present.
This change forces Iceberg metadata scanner fragments to be scheduled on
the coordinator. It is not enough to set the DataPartition type to
UNPARTITIONED, because unpartitioned fragments can still be scheduled on
executors. This change introduces a new flag in the TPlanFragment thrift
struct - if it is true, the fragment is always scheduled on the
coordinator.
Testing:
- Added a regression test in test_coordinators.py.
- Added a new planner test with two metadata tables and a regular table
joined together.
Change-Id: Ib4397f64e9def42d2b84ffd7bc14ff31df27d58e
Reviewed-on: http://gerrit.cloudera.org:8080/21138
Reviewed-by: Impala Public Jenkins <[email protected]>
Tested-by: Impala Public Jenkins <[email protected]>
---
be/src/scheduling/schedule-state.cc | 7 +-
be/src/scheduling/schedule-state.h | 5 +-
be/src/scheduling/scheduler.cc | 18 ++--
common/thrift/Planner.thrift | 4 +
.../apache/impala/planner/DistributedPlanner.java | 7 +-
.../org/apache/impala/planner/PlanFragment.java | 20 +++-
.../org/apache/impala/planner/PlannerTest.java | 6 ++
...g-metadata-table-joined-with-regular-table.test | 103 +++++++++++++++++++++
tests/custom_cluster/test_coordinators.py | 20 ++++
9 files changed, 175 insertions(+), 15 deletions(-)
diff --git a/be/src/scheduling/schedule-state.cc
b/be/src/scheduling/schedule-state.cc
index c4b43ceb5..0c7fb261a 100644
--- a/be/src/scheduling/schedule-state.cc
+++ b/be/src/scheduling/schedule-state.cc
@@ -51,7 +51,8 @@ void FInstanceScheduleState::AddScanRanges(
FragmentScheduleState::FragmentScheduleState(
const TPlanFragment& fragment, FragmentExecParamsPB* exec_params)
- : is_coord_fragment(false), fragment(fragment), exec_params(exec_params) {
+ : scan_range_assignment{}, is_root_coord_fragment(false),
+ fragment(fragment), exec_params(exec_params) {
exec_params->set_fragment_idx(fragment.idx);
}
@@ -90,10 +91,10 @@ void ScheduleState::Init() {
it->second, query_schedule_pb_->add_fragment_exec_params());
}
- // mark coordinator fragment
+ // mark root coordinator fragment
const TPlanFragment& root_fragment = request_.plan_exec_info[0].fragments[0];
if (RequiresCoordinatorFragment()) {
- fragment_schedule_states_[root_fragment.idx].is_coord_fragment = true;
+ fragment_schedule_states_[root_fragment.idx].is_root_coord_fragment = true;
// the coordinator instance gets index 0, generated instance ids start at 1
next_instance_id_ = CreateInstanceId(next_instance_id_, 1);
}
diff --git a/be/src/scheduling/schedule-state.h
b/be/src/scheduling/schedule-state.h
index 80ad37798..cbfde738b 100644
--- a/be/src/scheduling/schedule-state.h
+++ b/be/src/scheduling/schedule-state.h
@@ -104,7 +104,8 @@ struct FragmentScheduleState {
/// For scheduling, refer to FInstanceExecParamsPB.per_node_scan_ranges
FragmentScanRangeAssignment scan_range_assignment;
- bool is_coord_fragment;
+ /// The root fragment of the plan runs on the coordinator.
+ bool is_root_coord_fragment;
const TPlanFragment& fragment;
/// Fragments that are inputs to an ExchangeNode of this fragment.
@@ -350,7 +351,7 @@ class ScheduleState {
std::unordered_map<int32_t, const TPlanFragment&> fragments_;
/// Populate fragments_ and fragment_schedule_states_ from
request_.plan_exec_info.
- /// Sets is_coord_fragment and exchange_input_fragments.
+ /// Sets is_root_coord_fragment and exchange_input_fragments.
/// Also populates plan_node_to_fragment_idx_ and
plan_node_to_plan_node_idx_.
void Init();
diff --git a/be/src/scheduling/scheduler.cc b/be/src/scheduling/scheduler.cc
index 392f416fc..1809dce55 100644
--- a/be/src/scheduling/scheduler.cc
+++ b/be/src/scheduling/scheduler.cc
@@ -479,13 +479,19 @@ Status Scheduler::ComputeFragmentExecParams(const
ExecutorConfig& executor_confi
// the plan that must be executed at the coordinator or an unpartitioned
fragment
// that can be executed anywhere.
VLOG(3) << "Computing exec params for "
- << (fragment_state->is_coord_fragment ? "coordinator" :
"unpartitioned")
+ << (fragment_state->is_root_coord_fragment
+ ? "root coordinator" : "unpartitioned")
<< " fragment " << fragment_state->fragment.display_name;
NetworkAddressPB host;
NetworkAddressPB krpc_host;
- if (fragment_state->is_coord_fragment ||
executor_config.group.NumExecutors() == 0) {
- // The coordinator fragment must be scheduled on the coordinator.
Otherwise if
- // no executors are available, we need to schedule on the coordinator.
+ if (fragment_state->is_root_coord_fragment || fragment.is_coordinator_only
||
+ executor_config.group.NumExecutors() == 0) {
+ // 1. The root coordinator fragment
('fragment_state->is_root_coord_fragment') must
+ // be scheduled on the coordinator.
+ // 2. Some other fragments ('fragment.is_coordinator_only'), such as
+ // Iceberg metadata scanner fragments, must also run on the coordinator.
+ // 3. Otherwise if no executors are available, we need to schedule on the
+ // coordinator.
const BackendDescriptorPB& coord_desc = executor_config.coord_desc;
host = coord_desc.address();
DCHECK(coord_desc.has_krpc_address());
@@ -514,8 +520,8 @@ Status Scheduler::ComputeFragmentExecParams(const
ExecutorConfig& executor_confi
}
VLOG(3) << "Scheduled unpartitioned fragment on " << krpc_host;
DCHECK(IsResolvedAddress(krpc_host));
- // make sure that the coordinator instance ends up with instance idx 0
- UniqueIdPB instance_id = fragment_state->is_coord_fragment ?
+ // make sure that the root coordinator instance ends up with instance idx 0
+ UniqueIdPB instance_id = fragment_state->is_root_coord_fragment ?
state->query_id() :
state->GetNextInstanceId();
fragment_state->instance_states.emplace_back(
diff --git a/common/thrift/Planner.thrift b/common/thrift/Planner.thrift
index c18294395..01249f8f2 100644
--- a/common/thrift/Planner.thrift
+++ b/common/thrift/Planner.thrift
@@ -90,6 +90,10 @@ struct TPlanFragment {
// greater than 0 if query option COMPUTE_PROCESSING_COST=true. Currently
not enforced
// when fragment need to exceed max_fs_writers query option (see
IMPALA-8125).
14: optional i32 effective_instance_count
+
+ // If true, the fragment must be scheduled on the coordinator. In this case
'partition'
+ // must be UNPARTITIONED.
+ 15: required bool is_coordinator_only
}
// location information for a single scan range
diff --git a/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
b/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
index 48daa6484..e984b8615 100644
--- a/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
+++ b/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
@@ -405,11 +405,12 @@ public class DistributedPlanner {
}
/**
- * Create an Iceberg Metadata scan fragment. This fragment is UNPARTITIONED,
so the
- * scheduler can schedule it as coordinator only fragment.
+ * Create an Iceberg Metadata scan fragment. This fragment is marked as a
coordinator
+ * only fragment.
*/
private PlanFragment createIcebergMetadataScanFragment(PlanNode node) {
- return new PlanFragment(ctx_.getNextFragmentId(), node,
DataPartition.UNPARTITIONED);
+ return new PlanFragment(ctx_.getNextFragmentId(), node,
DataPartition.UNPARTITIONED,
+ /* coordinatorOnly */ true);
}
/**
diff --git a/fe/src/main/java/org/apache/impala/planner/PlanFragment.java
b/fe/src/main/java/org/apache/impala/planner/PlanFragment.java
index 33c5f24ba..e59df00cd 100644
--- a/fe/src/main/java/org/apache/impala/planner/PlanFragment.java
+++ b/fe/src/main/java/org/apache/impala/planner/PlanFragment.java
@@ -90,6 +90,8 @@ public class PlanFragment extends TreeNode<PlanFragment> {
private PlanId planId_;
private CohortId cohortId_;
+ private final boolean coordinatorOnly_;
+
// root of plan tree executed by this fragment
private PlanNode planRoot_;
@@ -178,12 +180,23 @@ public class PlanFragment extends TreeNode<PlanFragment> {
/**
* C'tor for fragment with specific partition; the output is by default
broadcast.
*/
- public PlanFragment(PlanFragmentId id, PlanNode root, DataPartition
partition) {
+ public PlanFragment(PlanFragmentId id, PlanNode root, DataPartition
partition,
+ boolean coordinatorOnly) {
fragmentId_ = id;
planRoot_ = root;
dataPartition_ = partition;
outputPartition_ = DataPartition.UNPARTITIONED;
setFragmentInPlanTree(planRoot_);
+ coordinatorOnly_ = coordinatorOnly;
+
+ // Coordinator-only fragments must be unpartitined as there is only one
instance of
+ // them.
+ Preconditions.checkState(!coordinatorOnly ||
+ dataPartition_.equals(DataPartition.UNPARTITIONED));
+ }
+
+ public PlanFragment(PlanFragmentId id, PlanNode root, DataPartition
partition) {
+ this(id, root, partition, false);
}
/**
@@ -480,6 +493,10 @@ public class PlanFragment extends TreeNode<PlanFragment> {
}
}
+ public boolean coordinatorOnly() {
+ return coordinatorOnly_;
+ }
+
public ResourceProfile getPerInstanceResourceProfile() {
return perInstanceResourceProfile_;
}
@@ -618,6 +635,7 @@ public class PlanFragment extends TreeNode<PlanFragment> {
perBackendResourceProfile_.getMinMemReservationBytes());
result.setThread_reservation(perInstanceResourceProfile_.getThreadReservation());
result.setEffective_instance_count(getAdjustedInstanceCount());
+ result.setIs_coordinator_only(coordinatorOnly_);
return result;
}
diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
index 8695e9551..a64c75be1 100644
--- a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
+++ b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
@@ -1352,6 +1352,12 @@ public class PlannerTest extends PlannerTestBase {
public void testIcebergMetadataTableScans() {
runPlannerTestFile("iceberg-metadata-table-scan", "functional_parquet",
ImmutableSet.of(PlannerTestOption.VALIDATE_CARDINALITY));
+
+ TQueryOptions options = defaultQueryOptions();
+ options.setExplain_level(TExplainLevel.EXTENDED);
+ runPlannerTestFile("iceberg-metadata-table-joined-with-regular-table",
+ "functional_parquet", options, ImmutableSet.of(
+ PlannerTestOption.DO_NOT_VALIDATE_ROWCOUNT_ESTIMATION_FOR_PARTITIONS));
}
/**
diff --git
a/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-metadata-table-joined-with-regular-table.test
b/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-metadata-table-joined-with-regular-table.test
new file mode 100644
index 000000000..771bef40e
--- /dev/null
+++
b/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-metadata-table-joined-with-regular-table.test
@@ -0,0 +1,103 @@
+select count(DISTINCT a.parent_id, b.is_current_ancestor)
+from functional_parquet.iceberg_query_metadata.history a
+join functional_parquet.iceberg_alltypes_part.history b
+on a.is_current_ancestor = b.is_current_ancestor
+join functional_parquet.alltypes c
+on a.is_current_ancestor = c.bool_col;
+---- DISTRIBUTEDPLAN
+F04:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+| Per-Host Resources: mem-estimate=32.00KB mem-reservation=0B
thread-reservation=1
+PLAN-ROOT SINK
+| output exprs: count(if(a.parent_id IS NULL, NULL, b.is_current_ancestor))
+| mem-estimate=0B mem-reservation=0B thread-reservation=0
+|
+12:AGGREGATE [FINALIZE]
+| output: count:merge(if(a.parent_id IS NULL, NULL, b.is_current_ancestor))
+| mem-estimate=16.00KB mem-reservation=0B spill-buffer=2.00MB
thread-reservation=0
+| tuple-ids=4 row-size=8B cardinality=1
+| in pipelines: 12(GETNEXT), 06(OPEN)
+|
+11:EXCHANGE [UNPARTITIONED]
+| mem-estimate=16.00KB mem-reservation=0B thread-reservation=0
+| tuple-ids=4 row-size=8B cardinality=1
+| in pipelines: 06(GETNEXT)
+|
+F03:PLAN FRAGMENT [HASH(a.parent_id,b.is_current_ancestor)] hosts=3 instances=3
+Per-Host Resources: mem-estimate=128.07MB mem-reservation=34.00MB
thread-reservation=1
+06:AGGREGATE
+| output: count(if(a.parent_id IS NULL, NULL, b.is_current_ancestor))
+| mem-estimate=16.00KB mem-reservation=0B spill-buffer=2.00MB
thread-reservation=0
+| tuple-ids=4 row-size=8B cardinality=1
+| in pipelines: 06(GETNEXT), 10(OPEN)
+|
+10:AGGREGATE
+| group by: a.parent_id, b.is_current_ancestor
+| mem-estimate=128.00MB mem-reservation=34.00MB spill-buffer=2.00MB
thread-reservation=0
+| tuple-ids=3 row-size=9B cardinality=11.96K
+| in pipelines: 10(GETNEXT), 02(OPEN)
+|
+09:EXCHANGE [HASH(a.parent_id,b.is_current_ancestor)]
+| mem-estimate=73.97KB mem-reservation=0B thread-reservation=0
+| tuple-ids=3 row-size=9B cardinality=11.96K
+| in pipelines: 02(GETNEXT)
+|
+F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
+Per-Host Resources: mem-estimate=4.14GB mem-reservation=104.01MB
thread-reservation=2 runtime-filters-memory=2.00MB
+05:AGGREGATE [STREAMING]
+| group by: a.parent_id, b.is_current_ancestor
+| mem-estimate=128.00MB mem-reservation=34.00MB spill-buffer=2.00MB
thread-reservation=0
+| tuple-ids=3 row-size=9B cardinality=11.96K
+| in pipelines: 02(GETNEXT)
+|
+04:HASH JOIN [INNER JOIN, BROADCAST]
+| hash predicates: a.is_current_ancestor = b.is_current_ancestor
+| fk/pk conjuncts: assumed fk/pk
+| runtime filters: RF000[bloom] <- b.is_current_ancestor
+| mem-estimate=2.00GB mem-reservation=34.00MB spill-buffer=2.00MB
thread-reservation=0
+| tuple-ids=2,0,1 row-size=11B cardinality=11.96K
+| in pipelines: 02(GETNEXT), 01(OPEN)
+|
+|--08:EXCHANGE [BROADCAST]
+| | mem-estimate=16.00KB mem-reservation=0B thread-reservation=0
+| | tuple-ids=1 row-size=1B cardinality=unavailable
+| | in pipelines: 01(GETNEXT)
+| |
+| F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+| Per-Host Resources: mem-estimate=20.00KB mem-reservation=0B
thread-reservation=1
+| 01:SCAN ICEBERG METADATA [functional_parquet.iceberg_alltypes_part.HISTORY
b]
+| mem-estimate=0B mem-reservation=0B thread-reservation=0
+| tuple-ids=1 row-size=1B cardinality=unavailable
+| in pipelines: 01(GETNEXT)
+|
+03:HASH JOIN [INNER JOIN, BROADCAST]
+| hash predicates: c.bool_col = a.is_current_ancestor
+| fk/pk conjuncts: assumed fk/pk
+| runtime filters: RF002[bloom] <- a.is_current_ancestor
+| mem-estimate=2.00GB mem-reservation=34.00MB spill-buffer=2.00MB
thread-reservation=0
+| tuple-ids=2,0 row-size=10B cardinality=11.96K
+| in pipelines: 02(GETNEXT), 00(OPEN)
+|
+|--07:EXCHANGE [BROADCAST]
+| | mem-estimate=16.00KB mem-reservation=0B thread-reservation=0
+| | tuple-ids=0 row-size=9B cardinality=unavailable
+| | in pipelines: 00(GETNEXT)
+| |
+| F01:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+| Per-Host Resources: mem-estimate=52.00KB mem-reservation=0B
thread-reservation=1
+| 00:SCAN ICEBERG METADATA [functional_parquet.iceberg_query_metadata.HISTORY
a]
+| mem-estimate=0B mem-reservation=0B thread-reservation=0
+| tuple-ids=0 row-size=9B cardinality=unavailable
+| in pipelines: 00(GETNEXT)
+|
+02:SCAN HDFS [functional_parquet.alltypes c, RANDOM]
+ HDFS partitions=24/24 files=24 size=187.63KB
+ runtime filters: RF000[bloom] -> c.bool_col, RF002[bloom] -> c.bool_col
+ stored statistics:
+ table: rows=unavailable size=unavailable
+ partitions: 0/24 rows=11.96K
+ columns: unavailable
+ extrapolated-rows=disabled max-scan-range-rows=unavailable
+ mem-estimate=16.00MB mem-reservation=8.00KB thread-reservation=1
+ tuple-ids=2 row-size=1B cardinality=11.96K
+ in pipelines: 02(GETNEXT)
+====
diff --git a/tests/custom_cluster/test_coordinators.py
b/tests/custom_cluster/test_coordinators.py
index 436e26572..ee101a6cd 100644
--- a/tests/custom_cluster/test_coordinators.py
+++ b/tests/custom_cluster/test_coordinators.py
@@ -291,6 +291,26 @@ class TestCoordinators(CustomClusterTestSuite):
assert client is not None
self._stop_impala_cluster()
+ @pytest.mark.execute_serially
+ @CustomClusterTestSuite.with_args(cluster_size=3,
num_exclusive_coordinators=1)
+ def test_iceberg_metadata_scan_on_coord(self):
+ """ Tests that Iceberg metadata scan fragments are scheduled on the
coordinator. If
+ such a fragment is scheduled on an executor, the below queries fail.
Regression test
+ for IMPALA-12809"""
+ # A metadata table joined with itself.
+ q1 = """select count(b.parent_id)
+ from functional_parquet.iceberg_query_metadata.history a
+ join functional_parquet.iceberg_query_metadata.history b
+ on a.snapshot_id = b.snapshot_id"""
+ self.execute_query_expect_success(self.client, q1)
+
+ # A metadata table joined with a regular table.
+ q2 = """select count(DISTINCT a.parent_id, a.is_current_ancestor)
+ from functional_parquet.iceberg_query_metadata.history a
+ join functional_parquet.alltypestiny c
+ on a.is_current_ancestor = c.bool_col"""
+ self.execute_query_expect_success(self.client, q2)
+
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(impalad_args="--queue_wait_timeout_ms=2000",
cluster_size=1,
num_exclusive_coordinators=1)