This is an automated email from the ASF dual-hosted git repository.
dbecker pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new f68d91dce IMPALA-12610: Support reading ARRAY columns for Iceberg
Metadata tables
f68d91dce is described below
commit f68d91dcee76ccd90dfc7f1111623ea531794d03
Author: Tamas Mate <[email protected]>
AuthorDate: Fri Feb 23 10:23:24 2024 +0100
IMPALA-12610: Support reading ARRAY columns for Iceberg Metadata tables
This commit adds support for reading ARRAY columns inside Iceberg
Metadata tables.
The change starts with some refactoring, to consolidate accessing JVM
through JNI a new backend class was introduced, IcebergMetadataScanner.
This class is the C++ part of the Java IcebergMetadataScanner, it is
responsible to manage the Java scanner object.
In Iceberg array types do not have accessors, so structs inside arrays
have to be accessed by position, for the value obtaining logics have been
changed to allow access by position.
The IcebergRowReader needed an IcebergMetadataScanner, so that it can
iterate over the arrays returned by the scanner and add them to the
collection.
This change will not cover MAP, these slots are set to NULL, it will
be done in IMPALA-12611.
Testing:
- Added E2E tests.
Change-Id: Ieb9bac1822a17bd3cd074b4b98e4d010703cecb1
Reviewed-on: http://gerrit.cloudera.org:8080/21061
Tested-by: Impala Public Jenkins <[email protected]>
Reviewed-by: Gabor Kaszab <[email protected]>
---
be/src/exec/iceberg-metadata/CMakeLists.txt | 1 +
.../iceberg-metadata/iceberg-metadata-scan-node.cc | 133 ++-----------
.../iceberg-metadata/iceberg-metadata-scan-node.h | 48 +----
.../iceberg-metadata/iceberg-metadata-scanner.cc | 213 +++++++++++++++++++++
.../iceberg-metadata/iceberg-metadata-scanner.h | 123 ++++++++++++
be/src/exec/iceberg-metadata/iceberg-row-reader.cc | 169 +++++++++++-----
be/src/exec/iceberg-metadata/iceberg-row-reader.h | 68 ++++---
be/src/service/impalad-main.cc | 4 +-
.../org/apache/impala/analysis/FromClause.java | 5 +-
.../apache/impala/util/IcebergMetadataScanner.java | 82 +++++---
.../queries/QueryTest/iceberg-metadata-tables.test | 180 ++++++++++++++---
11 files changed, 734 insertions(+), 292 deletions(-)
diff --git a/be/src/exec/iceberg-metadata/CMakeLists.txt
b/be/src/exec/iceberg-metadata/CMakeLists.txt
index ea63de71a..35e25cf38 100644
--- a/be/src/exec/iceberg-metadata/CMakeLists.txt
+++ b/be/src/exec/iceberg-metadata/CMakeLists.txt
@@ -23,6 +23,7 @@ set(EXECUTABLE_OUTPUT_PATH
"${BUILD_OUTPUT_ROOT_DIRECTORY}/exec/iceberg-metadata
add_library(ExecIcebergMetadata
iceberg-metadata-scan-node.cc
+ iceberg-metadata-scanner.cc
iceberg-row-reader.cc
)
diff --git a/be/src/exec/iceberg-metadata/iceberg-metadata-scan-node.cc
b/be/src/exec/iceberg-metadata/iceberg-metadata-scan-node.cc
index 352371930..bb4b4c9b2 100644
--- a/be/src/exec/iceberg-metadata/iceberg-metadata-scan-node.cc
+++ b/be/src/exec/iceberg-metadata/iceberg-metadata-scan-node.cc
@@ -39,114 +39,32 @@
IcebergMetadataScanNode::IcebergMetadataScanNode(ObjectPool* pool,
table_name_(pnode.tnode_->iceberg_scan_metadata_node.table_name),
metadata_table_name_(pnode.tnode_->iceberg_scan_metadata_node.metadata_table_name)
{}
-Status IcebergMetadataScanNode::InitJNI() {
- DCHECK(impala_iceberg_metadata_scanner_cl_ == nullptr) << "InitJNI() already
called!";
- JNIEnv* env = JniUtil::GetJNIEnv();
- if (env == nullptr) return Status("Failed to get/create JVM");
- // Global class references:
- RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env,
- "org/apache/impala/util/IcebergMetadataScanner",
- &impala_iceberg_metadata_scanner_cl_));
- // Method ids:
- RETURN_IF_ERROR(JniUtil::GetMethodID(env,
impala_iceberg_metadata_scanner_cl_,
- "<init>",
"(Lorg/apache/impala/catalog/FeIcebergTable;Ljava/lang/String;)V",
- &iceberg_metadata_scanner_ctor_));
- RETURN_IF_ERROR(JniUtil::GetMethodID(env,
impala_iceberg_metadata_scanner_cl_,
- "ScanMetadataTable", "()V", &scan_metadata_table_));
- RETURN_IF_ERROR(JniUtil::GetMethodID(env,
impala_iceberg_metadata_scanner_cl_,
- "GetNext", "()Lorg/apache/iceberg/StructLike;", &get_next_));
- RETURN_IF_ERROR(JniUtil::GetMethodID(env,
impala_iceberg_metadata_scanner_cl_,
- "GetAccessor", "(I)Lorg/apache/iceberg/Accessor;", &get_accessor_));
- return Status::OK();
-}
-
Status IcebergMetadataScanNode::Prepare(RuntimeState* state) {
RETURN_IF_ERROR(ScanNode::Prepare(state));
scan_prepare_timer_ = ADD_TIMER(runtime_profile(), "ScanPrepareTime");
iceberg_api_scan_timer_ = ADD_TIMER(runtime_profile(), "IcebergApiScanTime");
- SCOPED_TIMER(scan_prepare_timer_);
- JNIEnv* env = JniUtil::GetJNIEnv();
- if (env == nullptr) return Status("Failed to get/create JVM");
tuple_desc_ = state->desc_tbl().GetTupleDescriptor(tuple_id_);
if (tuple_desc_ == nullptr) {
return Status("Failed to get tuple descriptor, tuple id: " +
std::to_string(tuple_id_));
}
- // Get the FeTable object from the Frontend
- jobject jtable;
- RETURN_IF_ERROR(GetCatalogTable(&jtable));
- // Create the Java Scanner object and scan the table
- jstring jstr_metadata_table_name =
env->NewStringUTF(metadata_table_name_.c_str());
- jobject jmetadata_scanner =
env->NewObject(impala_iceberg_metadata_scanner_cl_,
- iceberg_metadata_scanner_ctor_, jtable, jstr_metadata_table_name);
- RETURN_ERROR_IF_EXC(env);
- RETURN_IF_ERROR(JniUtil::LocalToGlobalRef(env, jmetadata_scanner,
&jmetadata_scanner_));
- RETURN_ERROR_IF_EXC(env);
- RETURN_IF_ERROR(ScanMetadataTable());
- RETURN_IF_ERROR(CreateFieldAccessors());
- return Status::OK();
-}
-
-Status IcebergMetadataScanNode::CreateFieldAccessors() {
- JNIEnv* env = JniUtil::GetJNIEnv();
- if (env == nullptr) return Status("Failed to get/create JVM");
- for (SlotDescriptor* slot_desc: tuple_desc_->slots()) {
- int field_id = -1;
- if (slot_desc->col_path().size() == 1) {
- // Top level slots have ColumnDescriptors that store the field ids.
- field_id =
tuple_desc_->table_desc()->GetColumnDesc(slot_desc).field_id();
- } else {
- // Non top level slots are fields of a nested type. This code path is to
handle
- // slots that does not have their nested type's tuple available.
- // This loop finds the struct ColumnType node that stores the slot as it
has the
- // field id list of its children.
- int root_type_index = slot_desc->col_path()[0];
- ColumnType* current_type = &const_cast<ColumnType&>(
- tuple_desc_->table_desc()->col_descs()[root_type_index].type());
- for (int i = 1; i < slot_desc->col_path().size() - 1; ++i) {
- current_type = ¤t_type->children[slot_desc->col_path()[i]];
- }
- field_id = current_type->field_ids[slot_desc->col_path().back()];
- }
- DCHECK_NE(field_id, -1);
- RETURN_IF_ERROR(AddAccessorForFieldId(env, field_id, slot_desc->id()));
- if (slot_desc->type().IsStructType()) {
- RETURN_IF_ERROR(CreateFieldAccessors(env, slot_desc));
- }
- }
- return Status::OK();
-}
-
-Status IcebergMetadataScanNode::CreateFieldAccessors(JNIEnv* env,
- const SlotDescriptor* struct_slot_desc) {
- if (!struct_slot_desc->type().IsStructType()) return Status::OK();
- const std::vector<int>& struct_field_ids =
struct_slot_desc->type().field_ids;
- for (SlotDescriptor* child_slot_desc:
- struct_slot_desc->children_tuple_descriptor()->slots()) {
- int field_id = struct_field_ids[child_slot_desc->col_path().back()];
- RETURN_IF_ERROR(AddAccessorForFieldId(env, field_id,
child_slot_desc->id()));
- if (child_slot_desc->type().IsStructType()) {
- RETURN_IF_ERROR(CreateFieldAccessors(env, child_slot_desc));
- }
- }
- return Status::OK();
-}
-
-Status IcebergMetadataScanNode::AddAccessorForFieldId(JNIEnv* env, int
field_id,
- SlotId slot_id) {
- jobject accessor_for_field = env->CallObjectMethod(jmetadata_scanner_,
- get_accessor_, field_id);
- RETURN_ERROR_IF_EXC(env);
- jobject accessor_for_field_global_ref;
- RETURN_IF_ERROR(JniUtil::LocalToGlobalRef(env, accessor_for_field,
- &accessor_for_field_global_ref));
- jaccessors_[slot_id] = accessor_for_field_global_ref;
return Status::OK();
}
Status IcebergMetadataScanNode::Open(RuntimeState* state) {
+ SCOPED_TIMER(scan_prepare_timer_);
RETURN_IF_ERROR(ScanNode::Open(state));
- iceberg_row_reader_.reset(new IcebergRowReader(jaccessors_));
+ JNIEnv* env = JniUtil::GetJNIEnv();
+ if (env == nullptr) return Status("Failed to get/create JVM");
+ // Get the FeTable object from the Frontend
+ jobject jtable;
+ RETURN_IF_ERROR(GetCatalogTable(&jtable));
+ metadata_scanner_.reset(new IcebergMetadataScanner(jtable,
metadata_table_name_.c_str(),
+ tuple_desc_));
+ RETURN_IF_ERROR(metadata_scanner_->Init(env));
+ iceberg_row_reader_.reset(new IcebergRowReader(this,
metadata_scanner_.get()));
+ SCOPED_TIMER(iceberg_api_scan_timer_);
+ RETURN_IF_ERROR(metadata_scanner_->ScanMetadataTable(env));
return Status::OK();
}
@@ -167,9 +85,10 @@ Status IcebergMetadataScanNode::GetNext(RuntimeState*
state, RowBatch* row_batch
int row_idx = row_batch->AddRow();
TupleRow* tuple_row = row_batch->GetRow(row_idx);
tuple_row->SetTuple(0, tuple);
+
// Get the next row from 'org.apache.impala.util.IcebergMetadataScanner'
- jobject struct_like_row = env->CallObjectMethod(jmetadata_scanner_,
get_next_);
- RETURN_ERROR_IF_EXC(env);
+ jobject struct_like_row;
+ RETURN_IF_ERROR(metadata_scanner_->GetNext(env, &struct_like_row));
// When 'struct_like_row' is null, there are no more rows to read
if (struct_like_row == nullptr) {
*eos = true;
@@ -177,7 +96,7 @@ Status IcebergMetadataScanNode::GetNext(RuntimeState* state,
RowBatch* row_batch
}
// Translate a StructLikeRow from Iceberg to Tuple
RETURN_IF_ERROR(iceberg_row_reader_->MaterializeTuple(env, struct_like_row,
- tuple_desc_, tuple, row_batch->tuple_data_pool()));
+ tuple_desc_, tuple, row_batch->tuple_data_pool(), state));
env->DeleteLocalRef(struct_like_row);
RETURN_ERROR_IF_EXC(env);
COUNTER_ADD(rows_read_counter(), 1);
@@ -201,16 +120,7 @@ Status IcebergMetadataScanNode::GetNext(RuntimeState*
state, RowBatch* row_batch
void IcebergMetadataScanNode::Close(RuntimeState* state) {
if (is_closed()) return;
- JNIEnv* env = JniUtil::GetJNIEnv();
- if (env != nullptr) {
- // Close global references
- if (jmetadata_scanner_ != nullptr)
env->DeleteGlobalRef(jmetadata_scanner_);
- for (auto accessor : jaccessors_) {
- if (accessor.second != nullptr) env->DeleteGlobalRef(accessor.second);
- }
- } else {
- LOG(ERROR) << "Couldn't get JNIEnv, unable to release Global JNI
references";
- }
+ metadata_scanner_->Close(state);
ScanNode::Close(state);
}
@@ -219,12 +129,3 @@ Status IcebergMetadataScanNode::GetCatalogTable(jobject*
jtable) {
RETURN_IF_ERROR(fe->GetCatalogTable(table_name_, jtable));
return Status::OK();
}
-
-Status IcebergMetadataScanNode::ScanMetadataTable() {
- JNIEnv* env = JniUtil::GetJNIEnv();
- if (env == nullptr) return Status("Failed to get/create JVM");
- SCOPED_TIMER(iceberg_api_scan_timer_);
- env->CallObjectMethod(jmetadata_scanner_, scan_metadata_table_);
- RETURN_ERROR_IF_EXC(env);
- return Status::OK();
-}
diff --git a/be/src/exec/iceberg-metadata/iceberg-metadata-scan-node.h
b/be/src/exec/iceberg-metadata/iceberg-metadata-scan-node.h
index ae15efbd4..d970e503c 100644
--- a/be/src/exec/iceberg-metadata/iceberg-metadata-scan-node.h
+++ b/be/src/exec/iceberg-metadata/iceberg-metadata-scan-node.h
@@ -18,6 +18,7 @@
#pragma once
#include "exec/iceberg-metadata/iceberg-row-reader.h"
+#include "exec/iceberg-metadata/iceberg-metadata-scanner.h"
#include "exec/scan-node.h"
#include <jni.h>
@@ -56,7 +57,7 @@ class Status;
class IcebergMetadataScanPlanNode : public ScanPlanNode {
public:
Status CreateExecNode(RuntimeState* state, ExecNode** node) const override;
- ~IcebergMetadataScanPlanNode(){}
+ ~IcebergMetadataScanPlanNode() {}
};
class IcebergMetadataScanNode : public ScanNode {
@@ -64,10 +65,6 @@ class IcebergMetadataScanNode : public ScanNode {
IcebergMetadataScanNode(ObjectPool* pool, const IcebergMetadataScanPlanNode&
pnode,
const DescriptorTbl& descs);
- /// JNI setup. Creates global references for Java classes and find method
ids.
- /// Initializes static members, should be called once per process lifecycle.
- static Status InitJNI() WARN_UNUSED_RESULT;
-
/// Initializes counters, executes Iceberg table scan and initializes
accessors.
Status Prepare(RuntimeState* state) override;
@@ -81,27 +78,13 @@ class IcebergMetadataScanNode : public ScanNode {
void Close(RuntimeState* state) override;
private:
- /// Global class references created with JniUtil.
- inline static jclass impala_iceberg_metadata_scanner_cl_ = nullptr;
-
- /// Method references created with JniUtil.
- inline static jmethodID iceberg_metadata_scanner_ctor_ = nullptr;
- inline static jmethodID scan_metadata_table_ = nullptr;
- inline static jmethodID get_accessor_ = nullptr;
- inline static jmethodID get_next_ = nullptr;
-
- /// Iceberg metadata scanner Java object, it helps preparing the metadata
table and
- /// executes an Iceberg table scan. Allows the ScanNode to fetch the
metadata from
- /// the Java Heap.
- jobject jmetadata_scanner_;
+ /// Adapter that helps preparing the metadata table and executes an Iceberg
table scan
+ /// on Java side. Allows the ScanNode to fetch the metadata from the Java
Heap.
+ std::unique_ptr<IcebergMetadataScanner> metadata_scanner_;
/// Helper class to transform Iceberg rows to Impala tuples.
std::unique_ptr<IcebergRowReader> iceberg_row_reader_;
- /// Accessor map for the scan result, pairs the slot ids with the java
Accessor
- /// objects.
- std::unordered_map<SlotId, jobject> jaccessors_;
-
// The TupleId and TupleDescriptor of the tuple that this scan node will
populate.
const TupleId tuple_id_;
const TupleDescriptor* tuple_desc_ = nullptr;
@@ -114,29 +97,8 @@ class IcebergMetadataScanNode : public ScanNode {
RuntimeProfile::Counter* scan_prepare_timer_;
RuntimeProfile::Counter* iceberg_api_scan_timer_;
- /// Initializes the metadata table and executes an Iceberg scan through JNI.
- Status ScanMetadataTable();
-
/// Gets the FeIceberg table from the Frontend.
Status GetCatalogTable(jobject* jtable);
-
- /// Populates the jaccessors_ map by creating the accessors for the columns
in the JVM.
- /// To create a field accessor for a column the Iceberg field id is needed.
For columns
- /// that are not a field of a struct, this can be found in the
ColumnDescriptor.
- /// However, ColumnDescriptors are not available for struct fields, in this
case the
- /// ColumnType of the SlotDescriptor can be used.
- Status CreateFieldAccessors();
-
- /// Recursive part of the Accessor collection, when there is a struct in the
tuple.
- /// Collects the field ids of the struct members. The type_ field inside the
struct slot
- /// stores an ordered list of Iceberg Struct member field ids. This list can
be indexed
- /// with the last element of SchemaPath col_path to obtain the correct field
id of the
- /// struct member.
- Status CreateFieldAccessors(JNIEnv* env, const SlotDescriptor*
struct_slot_desc);
-
- /// Helper method to simplify adding new accessors to the jaccessors_ map.
It obtains
- /// the Accessor through JNI and persists it into the jaccessors_ map.
- Status AddAccessorForFieldId(JNIEnv* env, int field_id, SlotId slot_id);
};
}
diff --git a/be/src/exec/iceberg-metadata/iceberg-metadata-scanner.cc
b/be/src/exec/iceberg-metadata/iceberg-metadata-scanner.cc
new file mode 100644
index 000000000..3443eca67
--- /dev/null
+++ b/be/src/exec/iceberg-metadata/iceberg-metadata-scanner.cc
@@ -0,0 +1,213 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exec/iceberg-metadata/iceberg-metadata-scanner.h"
+#include "util/jni-util.h"
+
+namespace impala {
+
+IcebergMetadataScanner::IcebergMetadataScanner(jobject jtable,
+ const char* metadata_table_name, const TupleDescriptor* tuple_desc)
+ : jtable_(jtable),
+ metadata_table_name_(metadata_table_name),
+ tuple_desc_(tuple_desc) {}
+
+Status IcebergMetadataScanner::InitJNI() {
+ DCHECK(iceberg_metadata_scanner_cl_ == nullptr) << "InitJNI() already
called!";
+ JNIEnv* env = JniUtil::GetJNIEnv();
+ if (env == nullptr) return Status("Failed to get/create JVM");
+
+ // Global class references:
+ RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env,
+ "org/apache/impala/util/IcebergMetadataScanner",
+ &iceberg_metadata_scanner_cl_));
+ RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env,
+ "org/apache/impala/util/IcebergMetadataScanner$ArrayScanner",
+ &iceberg_metadata_scanner_array_scanner_cl_));
+ RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env,
+ "org/apache/iceberg/Accessor", &accessor_cl_));
+
+ // Method ids:
+ RETURN_IF_ERROR(JniUtil::GetMethodID(env, accessor_cl_, "get",
+ "(Ljava/lang/Object;)Ljava/lang/Object;", &accessor_get_));
+ RETURN_IF_ERROR(JniUtil::GetMethodID(env, iceberg_metadata_scanner_cl_,
+ "<init>",
"(Lorg/apache/impala/catalog/FeIcebergTable;Ljava/lang/String;)V",
+ &iceberg_metadata_scanner_ctor_));
+ RETURN_IF_ERROR(JniUtil::GetMethodID(env, iceberg_metadata_scanner_cl_,
+ "ScanMetadataTable", "()V",
&iceberg_metadata_scanner_scan_metadata_table_));
+ RETURN_IF_ERROR(JniUtil::GetMethodID(env, iceberg_metadata_scanner_cl_,
+ "GetNext", "()Lorg/apache/iceberg/StructLike;",
+ &iceberg_metadata_scanner_get_next_));
+ RETURN_IF_ERROR(JniUtil::GetMethodID(env, iceberg_metadata_scanner_cl_,
+ "GetValueByFieldId",
"(Lorg/apache/iceberg/StructLike;I)Ljava/lang/Object;",
+ &iceberg_metadata_scanner_get_value_by_field_id_));
+ RETURN_IF_ERROR(JniUtil::GetMethodID(env, iceberg_metadata_scanner_cl_,
+ "GetValueByPosition",
+ "(Lorg/apache/iceberg/StructLike;ILjava/lang/Class;)Ljava/lang/Object;",
+ &iceberg_metadata_scanner_get_value_by_position_));
+ RETURN_IF_ERROR(JniUtil::GetMethodID(env,
+ iceberg_metadata_scanner_array_scanner_cl_, "<init>",
+ "(Lorg/apache/impala/util/IcebergMetadataScanner;Ljava/util/List;)V",
+ &iceberg_metadata_scanner_array_scanner_ctor_));
+ RETURN_IF_ERROR(JniUtil::GetMethodID(env,
+ iceberg_metadata_scanner_array_scanner_cl_,
+ "GetNextArrayItem", "()Ljava/lang/Object;",
+ &iceberg_metadata_scanner_array_scanner_get_next_array_item_));
+ return Status::OK();
+}
+
+Status IcebergMetadataScanner::Init(JNIEnv* env) {
+ jstring jstr_metadata_table_name = env->NewStringUTF(metadata_table_name_);
+ jobject jmetadata_scanner = env->NewObject(iceberg_metadata_scanner_cl_,
+ iceberg_metadata_scanner_ctor_, jtable_, jstr_metadata_table_name);
+ RETURN_ERROR_IF_EXC(env);
+ RETURN_IF_ERROR(JniUtil::LocalToGlobalRef(env, jmetadata_scanner,
&jmetadata_scanner_));
+ RETURN_IF_ERROR(InitSlotIdFieldIdMap(env));
+ return Status::OK();
+}
+
+Status IcebergMetadataScanner::ScanMetadataTable(JNIEnv* env) {
+ env->CallObjectMethod(jmetadata_scanner_,
+ iceberg_metadata_scanner_scan_metadata_table_);
+ RETURN_ERROR_IF_EXC(env);
+ return Status::OK();
+}
+
+Status IcebergMetadataScanner::InitSlotIdFieldIdMap(JNIEnv* env) {
+ for (const SlotDescriptor* slot_desc: tuple_desc_->slots()) {
+ int field_id = -1;
+ if (slot_desc->col_path().size() == 1) {
+ // Top level slots have ColumnDescriptors that store the field ids.
+ field_id =
tuple_desc_->table_desc()->GetColumnDesc(slot_desc).field_id();
+ } else {
+ // Non top level slots are fields of a nested type. This code path is to
handle
+ // slots that do not have their nested type's tuple available.
+ // This loop finds the struct ColumnType node that stores the slot as it
has the
+ // field id list of its children.
+ int root_type_index = slot_desc->col_path()[0];
+ const ColumnType* current_type =
+ &(tuple_desc_->table_desc()->col_descs()[root_type_index].type());
+ for (int i = 1; i < slot_desc->col_path().size() - 1; ++i) {
+ current_type = ¤t_type->children[slot_desc->col_path()[i]];
+ }
+ field_id = current_type->field_ids[slot_desc->col_path().back()];
+ }
+ DCHECK_NE(field_id, -1);
+ slot_id_to_field_id_map_[slot_desc->id()] = field_id;
+ if (slot_desc->type().IsStructType()) {
+ RETURN_IF_ERROR(InitSlotIdFieldIdMapForStruct(env, slot_desc));
+ }
+ }
+ return Status::OK();
+}
+
+Status IcebergMetadataScanner::InitSlotIdFieldIdMapForStruct(JNIEnv* env,
+ const SlotDescriptor* struct_slot_desc) {
+ DCHECK(struct_slot_desc->type().IsStructType());
+ const std::vector<int>& struct_field_ids =
struct_slot_desc->type().field_ids;
+ for (const SlotDescriptor* child_slot_desc:
+ struct_slot_desc->children_tuple_descriptor()->slots()) {
+ int field_id = struct_field_ids[child_slot_desc->col_path().back()];
+ slot_id_to_field_id_map_[child_slot_desc->id()] = field_id;
+ if (child_slot_desc->type().IsStructType()) {
+ RETURN_IF_ERROR(InitSlotIdFieldIdMapForStruct(env, child_slot_desc));
+ }
+ }
+ return Status::OK();
+}
+
+Status IcebergMetadataScanner::GetNext(JNIEnv* env, jobject* result) {
+ *result = env->CallObjectMethod(jmetadata_scanner_,
iceberg_metadata_scanner_get_next_);
+ RETURN_ERROR_IF_EXC(env);
+ return Status::OK();
+}
+
+Status IcebergMetadataScanner::GetNextArrayItem(JNIEnv* env, const jobject
&scanner,
+ jobject* result) {
+ *result = env->CallObjectMethod(scanner,
+ iceberg_metadata_scanner_array_scanner_get_next_array_item_);
+ RETURN_ERROR_IF_EXC(env);
+ return Status::OK();
+}
+
+Status IcebergMetadataScanner::GetValue(JNIEnv* env, const SlotDescriptor*
slot_desc,
+ const jobject &struct_like_row, const jclass& clazz, jobject* result) {
+ DCHECK(slot_desc != nullptr);
+ auto field_id_it = slot_id_to_field_id_map_.find(slot_desc->id());
+ if (field_id_it != slot_id_to_field_id_map_.end()) {
+ // Use accessor when it is available, these are top level primitive types,
top level
+ // structs and structs inside structs.
+ RETURN_IF_ERROR(GetValueByFieldId(env, struct_like_row,
field_id_it->second, result));
+ } else if (slot_desc->parent()->isTupleOfStructSlot()) {
+ // Accessor is not available, this must be a STRUCT inside an ARRAY.
+ int pos = slot_desc->col_path().back();
+ RETURN_IF_ERROR(GetValueByPosition(env, struct_like_row, pos, clazz,
result));
+ } else {
+ // Primitive inside an ARRAY, the value can be accessed directly.
+ *result = struct_like_row;
+ }
+ return Status::OK();
+}
+
+Status IcebergMetadataScanner::GetValueByFieldId(JNIEnv* env, const jobject
&struct_like,
+ int field_id, jobject* result) {
+ *result = env->CallObjectMethod(jmetadata_scanner_,
+ iceberg_metadata_scanner_get_value_by_field_id_, struct_like, field_id);
+ RETURN_ERROR_IF_EXC(env);
+ return Status::OK();
+}
+
+Status IcebergMetadataScanner::GetValueByPosition(JNIEnv* env, const jobject
&struct_like,
+ int pos, const jclass &clazz, jobject* result) {
+ *result = env->CallObjectMethod(jmetadata_scanner_,
+ iceberg_metadata_scanner_get_value_by_position_, struct_like, pos,
clazz);
+ RETURN_ERROR_IF_EXC(env);
+ return Status::OK();
+}
+
+Status IcebergMetadataScanner::CreateArrayScanner(JNIEnv* env, const jobject
&list,
+ jobject& result) {
+ result = env->NewObject(iceberg_metadata_scanner_array_scanner_cl_,
+ iceberg_metadata_scanner_array_scanner_ctor_, jmetadata_scanner_, list);
+ RETURN_ERROR_IF_EXC(env);
+ return Status::OK();
+}
+
+void IcebergMetadataScanner::Close(RuntimeState* state) {
+ JNIEnv* env = JniUtil::GetJNIEnv();
+ if (env != nullptr) {
+ if (jmetadata_scanner_ != nullptr)
env->DeleteGlobalRef(jmetadata_scanner_);
+ }
+}
+
+string IcebergMetadataScanner::DebugString() {
+ std::stringstream out;
+ out << "IcebergMetadataScanner: [ Metadata table name: "
+ << metadata_table_name_ << "; ";
+ std::unordered_map<impala::SlotId, int>::iterator it =
slot_id_to_field_id_map_.begin();
+ if (it != slot_id_to_field_id_map_.end()) {
+ out << "SlotId to FieldId map: [" << it->first << " : " << it->second;
+ for (it++; it != slot_id_to_field_id_map_.end(); it++) {
+ out << ", " << it->first << " : " << it->second;
+ }
+ out << "]; ";
+ }
+ out << tuple_desc_->DebugString() << "]";
+ return out.str();
+}
+
+}
\ No newline at end of file
diff --git a/be/src/exec/iceberg-metadata/iceberg-metadata-scanner.h
b/be/src/exec/iceberg-metadata/iceberg-metadata-scanner.h
new file mode 100644
index 000000000..034cee14e
--- /dev/null
+++ b/be/src/exec/iceberg-metadata/iceberg-metadata-scanner.h
@@ -0,0 +1,123 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "common/global-types.h"
+#include "common/status.h"
+#include "runtime/descriptors.h"
+
+#include <jni.h>
+
+namespace impala {
+
+class RuntimeState;
+
+/// Adapter class of the FE IcebergMetadataScanner, wraps the JNI calls as C++
methods.
+class IcebergMetadataScanner {
+ public:
+ IcebergMetadataScanner(jobject jtable, const char* metadata_table_name,
+ const TupleDescriptor* tuple_desc);
+
+ /// JNI setup. Creates global references for Java classes and finds method
ids.
+ /// Initializes static members, should be called once per process lifecycle.
+ static Status InitJNI() WARN_UNUSED_RESULT;
+
+ // Initializes this object, creates the java metadata scanner object.
+ Status Init(JNIEnv* env) WARN_UNUSED_RESULT;
+
+ /// Executes an Iceberg scan through JNI.
+ Status ScanMetadataTable(JNIEnv* env);
+
+ /// Gets the next row of 'org.apache.impala.util.IcebergMetadataScanner'.
+ Status GetNext(JNIEnv* env, jobject* result);
+
+ /// Wrapper over value access methods, decides whether to access the value
by accessor
+ /// or by position.
+ Status GetValue(JNIEnv* env, const SlotDescriptor* slot_desc,
+ const jobject &struct_like_row, const jclass &clazz, jobject* result);
+
+ /// Creates a Java ArrayScanner object that can be used to access Array
items.
+ /// Note that it returns a GlobalRef, that has to be released explicitly.
+ Status CreateArrayScanner(JNIEnv* env, const jobject &list, jobject& result);
+
+ /// Gets the next item of
'org.apache.impala.util.IcebergMetadataScanner.ArrayScanner'.
+ Status GetNextArrayItem(JNIEnv* env, const jobject &list, jobject* result);
+
+ /// Removes global references.
+ void Close(RuntimeState* state);
+
+ private:
+ /// Global class references created with JniUtil.
+ inline static jclass accessor_cl_ = nullptr;
+ inline static jclass iceberg_metadata_scanner_cl_ = nullptr;
+ inline static jclass iceberg_metadata_scanner_array_scanner_cl_ = nullptr;
+
+ /// Method references created with JniUtil.
+ inline static jmethodID accessor_get_ = nullptr;
+ inline static jmethodID iceberg_metadata_scanner_ctor_ = nullptr;
+ inline static jmethodID iceberg_metadata_scanner_scan_metadata_table_ =
nullptr;
+ inline static jmethodID iceberg_metadata_scanner_get_next_ = nullptr;
+ inline static jmethodID iceberg_metadata_scanner_get_value_by_field_id_ =
nullptr;
+ inline static jmethodID iceberg_metadata_scanner_get_value_by_position_ =
nullptr;
+ inline static jmethodID iceberg_metadata_scanner_array_scanner_ctor_ =
nullptr;
+ inline static jmethodID
iceberg_metadata_scanner_array_scanner_get_next_array_item_ =
+ nullptr;
+
+ /// The Impala FeTable object in Java, used to scan the metadata table.
+ jobject jtable_;
+
+ /// The name of the metadata table, used to identify which metadata table is
needed.
+ const char* metadata_table_name_;
+
+ /// Top level TupleDescriptor.
+ const TupleDescriptor* tuple_desc_;
+
+ /// Iceberg metadata scanner Java object, it helps preparing the metadata
table and
+ /// executes an Iceberg table scan. Allows the ScanNode to fetch the
metadata from
+ /// the Java Heap.
+ jobject jmetadata_scanner_;
+
+ /// Maps the SlotId to a FieldId, used when obtaining an Accessor for a
field.
+ std::unordered_map<SlotId, int> slot_id_to_field_id_map_;
+
+ /// Populates the slot_id_to_field_id_map_ map, with the field id an
accessor can be
+ /// obtained from the Iceberg library.
+ /// For primitive type columns that are not a field of a struct, this can be
found in
+ /// the ColumnDescriptor. However, ColumnDescriptors are not available for
struct
+ /// fields, in this case the InitSlotIdFieldIdMapForStruct can be used.
+ /// Collection types cannot be accessed through accessors, so those field
ids won't be
+ /// part of this map.
+ Status InitSlotIdFieldIdMap(JNIEnv* env);
+
+ /// Recursive part of the slot_id_to_field_id_map_ collection, when there is
a struct in
+ /// the tuple. Collects the field ids of the struct members. The type_ field
inside the
+ /// struct slot stores an ordered list of Iceberg Struct member field ids.
This list can
+ /// be indexed with the last element of SchemaPath col_path to obtain the
correct field
+ /// id of the struct member.
+ Status InitSlotIdFieldIdMapForStruct(JNIEnv* env,
+ const SlotDescriptor* struct_slot_desc);
+
+ /// Wrappers around the Java methods.
+ Status GetValueByFieldId(JNIEnv* env, const jobject &struct_like, int
field_id,
+ jobject* result);
+ Status GetValueByPosition(JNIEnv* env, const jobject &struct_like, int pos,
+ const jclass &clazz, jobject* result);
+
+ string DebugString();
+};
+}
diff --git a/be/src/exec/iceberg-metadata/iceberg-row-reader.cc
b/be/src/exec/iceberg-metadata/iceberg-row-reader.cc
index d2f680222..5c615cad0 100644
--- a/be/src/exec/iceberg-metadata/iceberg-row-reader.cc
+++ b/be/src/exec/iceberg-metadata/iceberg-row-reader.cc
@@ -16,7 +16,10 @@
// under the License.
#include "exec/exec-node.inline.h"
+#include "exec/iceberg-metadata/iceberg-metadata-scanner.h"
#include "exec/iceberg-metadata/iceberg-row-reader.h"
+#include "exec/scan-node.h"
+#include "runtime/collection-value-builder.h"
#include "runtime/runtime-state.h"
#include "runtime/timestamp-value.inline.h"
#include "runtime/tuple-row.h"
@@ -24,59 +27,52 @@
namespace impala {
-IcebergRowReader::IcebergRowReader(const std::unordered_map<SlotId, jobject>&
jaccessors)
- : jaccessors_(jaccessors) {}
+IcebergRowReader::IcebergRowReader(ScanNode* scan_node,
+ IcebergMetadataScanner* metadata_scanner)
+ : scan_node_(scan_node),
+ metadata_scanner_(metadata_scanner) {}
Status IcebergRowReader::InitJNI() {
- DCHECK(iceberg_accessor_cl_ == nullptr) << "InitJNI() already called!";
+ DCHECK(list_cl_ == nullptr) << "InitJNI() already called!";
JNIEnv* env = JniUtil::GetJNIEnv();
if (env == nullptr) return Status("Failed to get/create JVM");
+
// Global class references:
- RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env,
- "org/apache/iceberg/Accessor", &iceberg_accessor_cl_));
- RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env,
- "java/util/List", &list_cl_));
- RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env,
- "org/apache/iceberg/types/Types$NestedField",
&iceberg_nested_field_cl_));
- RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env,
- "java/lang/Boolean", &java_boolean_cl_));
- RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env,
- "java/lang/Integer", &java_int_cl_));
- RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env,
- "java/lang/Long", &java_long_cl_));
- RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env,
- "java/lang/CharSequence", &java_char_sequence_cl_));
+ RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, "java/util/List",
&list_cl_));
+ RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, "java/lang/Boolean",
&boolean_cl_));
+ RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, "java/lang/Integer",
&integer_cl_));
+ RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, "java/lang/Long",
&long_cl_));
+ RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, "java/lang/CharSequence",
+ &char_sequence_cl_));
// Method ids:
- RETURN_IF_ERROR(JniUtil::GetMethodID(env, list_cl_, "get",
- "(I)Ljava/lang/Object;", &list_get_));
- RETURN_IF_ERROR(JniUtil::GetMethodID(env, iceberg_accessor_cl_, "get",
- "(Ljava/lang/Object;)Ljava/lang/Object;", &iceberg_accessor_get_));
- RETURN_IF_ERROR(JniUtil::GetMethodID(env, java_boolean_cl_, "booleanValue",
"()Z",
+ RETURN_IF_ERROR(JniUtil::GetMethodID(env, list_cl_, "get",
"(I)Ljava/lang/Object;",
+ &list_get_));
+ RETURN_IF_ERROR(JniUtil::GetMethodID(env, list_cl_, "size", "()I",
&list_size_));
+ RETURN_IF_ERROR(JniUtil::GetMethodID(env, boolean_cl_, "booleanValue", "()Z",
&boolean_value_));
- RETURN_IF_ERROR(JniUtil::GetMethodID(env, java_int_cl_, "intValue", "()I",
- &int_value_));
- RETURN_IF_ERROR(JniUtil::GetMethodID(env, java_long_cl_, "longValue", "()J",
+ RETURN_IF_ERROR(JniUtil::GetMethodID(env, integer_cl_, "intValue", "()I",
+ &integer_value_));
+ RETURN_IF_ERROR(JniUtil::GetMethodID(env, long_cl_, "longValue", "()J",
&long_value_));
- RETURN_IF_ERROR(JniUtil::GetMethodID(env, java_char_sequence_cl_, "toString",
+ RETURN_IF_ERROR(JniUtil::GetMethodID(env, char_sequence_cl_, "toString",
"()Ljava/lang/String;", &char_sequence_to_string_));
return Status::OK();
}
Status IcebergRowReader::MaterializeTuple(JNIEnv* env,
jobject struct_like_row, const TupleDescriptor* tuple_desc, Tuple* tuple,
- MemPool* tuple_data_pool) {
+ MemPool* tuple_data_pool, RuntimeState* state) {
DCHECK(env != nullptr);
DCHECK(struct_like_row != nullptr);
DCHECK(tuple != nullptr);
DCHECK(tuple_data_pool != nullptr);
DCHECK(tuple_desc != nullptr);
- for (SlotDescriptor* slot_desc: tuple_desc->slots()) {
- jobject accessor = jaccessors_.at(slot_desc->id());
- jobject accessed_value = env->CallObjectMethod(accessor,
iceberg_accessor_get_,
- struct_like_row);
- RETURN_ERROR_IF_EXC(env);
+ for (const SlotDescriptor* slot_desc: tuple_desc->slots()) {
+ jobject accessed_value;
+ RETURN_IF_ERROR(metadata_scanner_->GetValue(env, slot_desc,
struct_like_row,
+ JavaClassFromImpalaType(slot_desc->type()), &accessed_value));
if (accessed_value == nullptr) {
tuple->SetNull(slot_desc->null_indicator_offset());
continue;
@@ -98,9 +94,13 @@ Status IcebergRowReader::MaterializeTuple(JNIEnv* env,
} case TYPE_STRING: { // java.lang.String
RETURN_IF_ERROR(WriteStringSlot(env, accessed_value, slot,
tuple_data_pool));
break;
- } case TYPE_STRUCT: {
+ } case TYPE_STRUCT: { // Struct type is not used by Impala to access
values.
RETURN_IF_ERROR(WriteStructSlot(env, struct_like_row, slot_desc, tuple,
- tuple_data_pool));
+ tuple_data_pool, state));
+ break;
+ } case TYPE_ARRAY: { // java.lang.ArrayList
+ RETURN_IF_ERROR(WriteArraySlot(env, accessed_value,
(CollectionValue*)slot,
+ slot_desc, tuple, tuple_data_pool, state));
break;
}
default:
@@ -108,42 +108,46 @@ Status IcebergRowReader::MaterializeTuple(JNIEnv* env,
tuple->SetNull(slot_desc->null_indicator_offset());
VLOG(3) << "Skipping unsupported column type: " <<
slot_desc->type().type;
}
+ env->DeleteLocalRef(accessed_value);
+ RETURN_ERROR_IF_EXC(env);
}
return Status::OK();
}
-Status IcebergRowReader::WriteBooleanSlot(JNIEnv* env, jobject accessed_value,
+Status IcebergRowReader::WriteBooleanSlot(JNIEnv* env, const jobject
&accessed_value,
void* slot) {
DCHECK(accessed_value != nullptr);
- DCHECK(env->IsInstanceOf(accessed_value, java_boolean_cl_) == JNI_TRUE);
+ DCHECK(env->IsInstanceOf(accessed_value, boolean_cl_) == JNI_TRUE);
jboolean result = env->CallBooleanMethod(accessed_value, boolean_value_);
RETURN_ERROR_IF_EXC(env);
*reinterpret_cast<bool*>(slot) = (bool)(result == JNI_TRUE);
return Status::OK();
}
-Status IcebergRowReader::WriteIntSlot(JNIEnv* env, jobject accessed_value,
void* slot) {
+Status IcebergRowReader::WriteIntSlot(JNIEnv* env, const jobject
&accessed_value,
+ void* slot) {
DCHECK(accessed_value != nullptr);
- DCHECK(env->IsInstanceOf(accessed_value, java_int_cl_) == JNI_TRUE);
- jint result = env->CallIntMethod(accessed_value, int_value_);
+ DCHECK(env->IsInstanceOf(accessed_value, integer_cl_) == JNI_TRUE);
+ jint result = env->CallIntMethod(accessed_value, integer_value_);
RETURN_ERROR_IF_EXC(env);
*reinterpret_cast<int32_t*>(slot) = reinterpret_cast<int32_t>(result);
return Status::OK();
}
-Status IcebergRowReader::WriteLongSlot(JNIEnv* env, jobject accessed_value,
void* slot) {
+Status IcebergRowReader::WriteLongSlot(JNIEnv* env, const jobject
&accessed_value,
+ void* slot) {
DCHECK(accessed_value != nullptr);
- DCHECK(env->IsInstanceOf(accessed_value, java_long_cl_) == JNI_TRUE);
+ DCHECK(env->IsInstanceOf(accessed_value, long_cl_) == JNI_TRUE);
jlong result = env->CallLongMethod(accessed_value, long_value_);
RETURN_ERROR_IF_EXC(env);
*reinterpret_cast<int64_t*>(slot) = reinterpret_cast<int64_t>(result);
return Status::OK();
}
-Status IcebergRowReader::WriteTimeStampSlot(JNIEnv* env, jobject
accessed_value,
+Status IcebergRowReader::WriteTimeStampSlot(JNIEnv* env, const jobject
&accessed_value,
void* slot) {
DCHECK(accessed_value != nullptr);
- DCHECK(env->IsInstanceOf(accessed_value, java_long_cl_) == JNI_TRUE);
+ DCHECK(env->IsInstanceOf(accessed_value, long_cl_) == JNI_TRUE);
jlong result = env->CallLongMethod(accessed_value, long_value_);
RETURN_ERROR_IF_EXC(env);
*reinterpret_cast<TimestampValue*>(slot) =
TimestampValue::FromUnixTimeMicros(result,
@@ -151,10 +155,10 @@ Status IcebergRowReader::WriteTimeStampSlot(JNIEnv* env,
jobject accessed_value,
return Status::OK();
}
-Status IcebergRowReader::WriteStringSlot(JNIEnv* env, jobject accessed_value,
void* slot,
- MemPool* tuple_data_pool) {
+Status IcebergRowReader::WriteStringSlot(JNIEnv* env, const jobject
&accessed_value,
+ void* slot, MemPool* tuple_data_pool) {
DCHECK(accessed_value != nullptr);
- DCHECK(env->IsInstanceOf(accessed_value, java_char_sequence_cl_) ==
JNI_TRUE);
+ DCHECK(env->IsInstanceOf(accessed_value, char_sequence_cl_) == JNI_TRUE);
jstring result = static_cast<jstring>(env->CallObjectMethod(accessed_value,
char_sequence_to_string_));
RETURN_ERROR_IF_EXC(env);
@@ -173,12 +177,79 @@ Status IcebergRowReader::WriteStringSlot(JNIEnv* env,
jobject accessed_value, vo
return Status::OK();
}
-Status IcebergRowReader::WriteStructSlot(JNIEnv* env, jobject struct_like_row,
- SlotDescriptor* slot_desc, Tuple* tuple, MemPool* tuple_data_pool) {
+Status IcebergRowReader::WriteStructSlot(JNIEnv* env, const jobject
&struct_like_row,
+ const SlotDescriptor* slot_desc, Tuple* tuple, MemPool* tuple_data_pool,
+ RuntimeState* state) {
DCHECK(slot_desc != nullptr);
+ DCHECK(struct_like_row != nullptr);
+ DCHECK(slot_desc->type().IsStructType());
RETURN_IF_ERROR(MaterializeTuple(env, struct_like_row,
- slot_desc->children_tuple_descriptor(), tuple, tuple_data_pool));
+ slot_desc->children_tuple_descriptor(), tuple, tuple_data_pool, state));
return Status::OK();
}
+Status IcebergRowReader::WriteArraySlot(JNIEnv* env, const jobject
&struct_like_row,
+ CollectionValue* slot, const SlotDescriptor* slot_desc, Tuple* tuple,
+ MemPool* tuple_data_pool, RuntimeState* state) {
+ DCHECK(slot_desc != nullptr);
+ DCHECK(slot_desc->type().IsCollectionType());
+ DCHECK(env->IsInstanceOf(struct_like_row, list_cl_) == JNI_TRUE);
+ const TupleDescriptor* item_tuple_desc =
slot_desc->children_tuple_descriptor();
+ *slot = CollectionValue();
+ CollectionValueBuilder coll_value_builder(slot, *item_tuple_desc,
tuple_data_pool,
+ state);
+ jobject array_scanner;
+ RETURN_IF_ERROR(metadata_scanner_->CreateArrayScanner(env, struct_like_row,
+ array_scanner));
+ int remaining_array_size = env->CallIntMethod(struct_like_row, list_size_);
+ RETURN_ERROR_IF_EXC(env);
+ while (!scan_node_->ReachedLimit() && remaining_array_size > 0) {
+ RETURN_IF_CANCELLED(state);
+ int num_tuples;
+ MemPool* tuple_data_pool_collection = coll_value_builder.pool();
+ Tuple* tuple;
+ RETURN_IF_ERROR(coll_value_builder.GetFreeMemory(&tuple, &num_tuples));
+ // 'num_tuples' can be very high if we're writing to a large
CollectionValue. Limit
+ // the number of tuples we read at one time so we don't spend too long in
the
+ // 'num_tuples' loop below before checking for cancellation or limit
reached.
+ num_tuples = std::min(num_tuples,
scan_node_->runtime_state()->batch_size());
+ int num_to_commit = 0;
+ while (num_to_commit < num_tuples && remaining_array_size > 0) {
+ tuple->Init(item_tuple_desc->byte_size());
+ jobject item;
+ RETURN_IF_ERROR(metadata_scanner_->GetNextArrayItem(env, array_scanner,
&item));
+ RETURN_IF_ERROR(MaterializeTuple(env, item, item_tuple_desc, tuple,
+ tuple_data_pool_collection, state));
+ // For filtering please see IMPALA-12853.
+ tuple += item_tuple_desc->byte_size();
+ ++num_to_commit;
+ --remaining_array_size;
+ }
+ coll_value_builder.CommitTuples(num_to_commit);
+ }
+ env->DeleteLocalRef(array_scanner);
+ RETURN_ERROR_IF_EXC(env);
+ return Status::OK();
+}
+
+jclass IcebergRowReader::JavaClassFromImpalaType(const ColumnType type) {
+ switch (type.type) {
+ case TYPE_BOOLEAN: { // java.lang.Boolean
+ return boolean_cl_;
+ } case TYPE_INT: { // java.lang.Integer
+ return integer_cl_;
+ } case TYPE_BIGINT: // java.lang.Long
+ case TYPE_TIMESTAMP: { // org.apache.iceberg.types.TimestampType
+ return long_cl_;
+ } case TYPE_STRING: { // java.lang.String
+ return char_sequence_cl_;
+ } case TYPE_ARRAY: { // java.lang.util.List
+ return list_cl_;
+ }
+ default:
+ VLOG(3) << "Skipping unsupported column type: " << type.type;
+ }
+ return nullptr;
+}
+
}
\ No newline at end of file
diff --git a/be/src/exec/iceberg-metadata/iceberg-row-reader.h
b/be/src/exec/iceberg-metadata/iceberg-row-reader.h
index 51395df20..4e7dafd9a 100644
--- a/be/src/exec/iceberg-metadata/iceberg-row-reader.h
+++ b/be/src/exec/iceberg-metadata/iceberg-row-reader.h
@@ -18,71 +18,87 @@
#pragma once
#include "common/global-types.h"
+#include "common/status.h"
+#include "runtime/collection-value-builder.h"
#include <jni.h>
#include <unordered_map>
namespace impala {
+class IcebergMetadataScanner;
class MemPool;
+class RuntimeState;
+class ScanNode;
class Status;
class SlotDescriptor;
class Tuple;
class TupleDescriptor;
-/// Row reader for Iceberg table scans, it translates a {StructLike} Java
object to Impala
-/// rows. It utilizes the provided {Accessor} objects to do this translation.
+/// Row reader for Iceberg table scans, it translates a {StructLike} Java
object to an
+/// Impala row. It utilizes IcebergMetadataScanner to handle this translation.
class IcebergRowReader {
public:
- /// Initialize the tuple descriptor and accessors
- IcebergRowReader(const std::unordered_map<SlotId, jobject>& jaccessors);
+ IcebergRowReader(ScanNode* scan_node, IcebergMetadataScanner*
metadata_scanner);
- /// JNI setup. Create global references for Java classes and find method ids.
+ /// JNI setup. Creates global references for Java classes and finds method
ids.
/// Initializes static members, should be called once per process lifecycle.
- static Status InitJNI();
+ static Status InitJNI() WARN_UNUSED_RESULT;
/// Materialize the StructLike Java objects into Impala rows.
Status MaterializeTuple(JNIEnv* env, jobject struct_like_row,
- const TupleDescriptor* tuple_desc, Tuple* tuple, MemPool*
tuple_data_pool);
+ const TupleDescriptor* tuple_desc, Tuple* tuple, MemPool*
tuple_data_pool,
+ RuntimeState* state);
private:
/// Global class references created with JniUtil.
- inline static jclass iceberg_accessor_cl_ = nullptr;
- inline static jclass iceberg_nested_field_cl_ = nullptr;
inline static jclass list_cl_ = nullptr;
- inline static jclass java_boolean_cl_ = nullptr;
- inline static jclass java_int_cl_ = nullptr;
- inline static jclass java_long_cl_ = nullptr;
- inline static jclass java_char_sequence_cl_ = nullptr;
+ inline static jclass boolean_cl_ = nullptr;
+ inline static jclass integer_cl_ = nullptr;
+ inline static jclass long_cl_ = nullptr;
+ inline static jclass char_sequence_cl_ = nullptr;
/// Method references created with JniUtil.
- inline static jmethodID iceberg_accessor_get_ = nullptr;
inline static jmethodID list_get_ = nullptr;
+ inline static jmethodID list_size_ = nullptr;
inline static jmethodID boolean_value_ = nullptr;
- inline static jmethodID int_value_ = nullptr;
+ inline static jmethodID integer_value_ = nullptr;
inline static jmethodID long_value_ = nullptr;
inline static jmethodID char_sequence_to_string_ = nullptr;
- /// Accessor map for the scan result, pairs the slot ids with the java
Accessor
- /// objects.
- const std::unordered_map<SlotId, jobject> jaccessors_;
+
+ /// The scan node that started this row reader.
+ ScanNode* scan_node_;
+
+ /// IcebergMetadataScanner class, used to get and access values inside java
objects.
+ IcebergMetadataScanner* metadata_scanner_;
/// Reads the value of a primitive from the StructLike, translates it to a
matching
/// Impala type and writes it into the target tuple. The related Accessor
objects are
/// stored in the jaccessors_ map and created during Prepare.
- Status WriteBooleanSlot(JNIEnv* env, jobject accessed_value, void* slot);
- Status WriteIntSlot(JNIEnv* env, jobject accessed_value, void* slot);
- Status WriteLongSlot(JNIEnv* env, jobject accessed_value, void* slot);
+ Status WriteBooleanSlot(JNIEnv* env, const jobject &accessed_value, void*
slot);
+ Status WriteIntSlot(JNIEnv* env, const jobject &accessed_value, void* slot);
+ Status WriteLongSlot(JNIEnv* env, const jobject &accessed_value, void* slot);
/// Iceberg TimeStamp is parsed into TimestampValue.
- Status WriteTimeStampSlot(JNIEnv* env, jobject accessed_value, void* slot);
+ Status WriteTimeStampSlot(JNIEnv* env, const jobject &accessed_value, void*
slot);
/// To obtain a character sequence from JNI the JniUtfCharGuard class is
used. Then the
/// data has to be copied to the tuple_data_pool, because the JVM releases
the reference
/// and reclaims the memory area.
- Status WriteStringSlot(JNIEnv* env, jobject accessed_value, void* slot,
+ Status WriteStringSlot(JNIEnv* env, const jobject &accessed_value, void*
slot,
MemPool* tuple_data_pool);
- /// Recursively calls MaterializeTuple method with the child tuple of the
struct slot.
- Status WriteStructSlot(JNIEnv* env, jobject struct_like_row, SlotDescriptor*
slot_desc,
- Tuple* tuple, MemPool* tuple_data_pool);
+
+ /// Nested types recursively call MaterializeTuple method with their child
tuple.
+ Status WriteStructSlot(JNIEnv* env, const jobject &struct_like_row,
+ const SlotDescriptor* slot_desc, Tuple* tuple, MemPool* tuple_data_pool,
+ RuntimeState* state);
+ Status WriteArraySlot(JNIEnv* env, const jobject &accessed_value,
CollectionValue* slot,
+ const SlotDescriptor* slot_desc, Tuple* tuple, MemPool* tuple_data_pool,
+ RuntimeState* state);
+
+ /// Helper method that gives back the Iceberg Java class for a ColumnType.
It is
+ /// specified in this class, to avoid defining all the Java type classes in
other
+ /// classes.
+ jclass JavaClassFromImpalaType(const ColumnType type);
};
}
diff --git a/be/src/service/impalad-main.cc b/be/src/service/impalad-main.cc
index ed9159ba2..a37e25dae 100644
--- a/be/src/service/impalad-main.cc
+++ b/be/src/service/impalad-main.cc
@@ -29,7 +29,7 @@
#include "common/status.h"
#include "exec/hbase/hbase-table-scanner.h"
#include "exec/hbase/hbase-table-writer.h"
-#include "exec/iceberg-metadata/iceberg-metadata-scan-node.h"
+#include "exec/iceberg-metadata/iceberg-metadata-scanner.h"
#include "exec/iceberg-metadata/iceberg-row-reader.h"
#include "exprs/hive-udf-call.h"
#include "exprs/timezone_db.h"
@@ -66,7 +66,7 @@ int ImpaladMain(int argc, char** argv) {
ABORT_IF_ERROR(HBaseTableScanner::Init());
ABORT_IF_ERROR(HBaseTable::InitJNI());
ABORT_IF_ERROR(HBaseTableWriter::InitJNI());
- ABORT_IF_ERROR(IcebergMetadataScanNode::InitJNI());
+ ABORT_IF_ERROR(IcebergMetadataScanner::InitJNI());
ABORT_IF_ERROR(IcebergRowReader::InitJNI());
ABORT_IF_ERROR(HiveUdfCall::InitEnv());
ABORT_IF_ERROR(JniCatalogCacheUpdateIterator::InitJNI());
diff --git a/fe/src/main/java/org/apache/impala/analysis/FromClause.java
b/fe/src/main/java/org/apache/impala/analysis/FromClause.java
index 306b5af86..ca731f907 100644
--- a/fe/src/main/java/org/apache/impala/analysis/FromClause.java
+++ b/fe/src/main/java/org/apache/impala/analysis/FromClause.java
@@ -165,9 +165,10 @@ public class FromClause extends StmtNode implements
Iterable<TableRef> {
Preconditions.checkNotNull(tblRef.getDesc());
Preconditions.checkNotNull(tblRef.getDesc().getPath());
Preconditions.checkNotNull(tblRef.getDesc().getPath().getRootTable());
+ // IMPALA-12853: Collection types in FROM clause for Iceberg Metadata
Tables
if (tblRef.getDesc().getPath().getRootTable() instanceof
IcebergMetadataTable) {
- throw new AnalysisException("Querying collection types (ARRAY/MAP) is
not " +
- "supported for Iceberg Metadata tables. (IMPALA-12610,
IMPALA-12611)");
+ throw new AnalysisException("Querying collection types (ARRAY/MAP) in
FROM " +
+ "clause is not supported for Iceberg Metadata tables.");
}
}
diff --git
a/fe/src/main/java/org/apache/impala/util/IcebergMetadataScanner.java
b/fe/src/main/java/org/apache/impala/util/IcebergMetadataScanner.java
index 37087da43..b514269ae 100644
--- a/fe/src/main/java/org/apache/impala/util/IcebergMetadataScanner.java
+++ b/fe/src/main/java/org/apache/impala/util/IcebergMetadataScanner.java
@@ -17,8 +17,12 @@
package org.apache.impala.util;
+import com.cloudera.cloud.storage.relocated.protobuf.Struct;
import com.google.common.base.Preconditions;
+import java.util.Iterator;
+import java.util.List;
+
import org.apache.iceberg.Accessor;
import org.apache.iceberg.DataTask;
import org.apache.iceberg.FileScanTask;
@@ -26,6 +30,8 @@ import org.apache.iceberg.MetadataTableType;
import org.apache.iceberg.MetadataTableUtils;
import org.apache.iceberg.StructLike;
import org.apache.impala.catalog.FeIcebergTable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.io.CloseableIterator;
@@ -39,14 +45,16 @@ import org.apache.iceberg.io.CloseableIterator;
* caller of {@code IcebergMetadataScanner}.
*/
public class IcebergMetadataScanner {
- // FeTable object is extracted by the backend and passed when this object is
created
- private FeIcebergTable iceTbl_ = null;
+ private final static Logger LOG =
LoggerFactory.getLogger(IcebergMetadataScanner.class);
- // Metadata table
- private Table metadataTable_ = null;
+ // Metadata table instance.
+ final private Table metadataTable_;
+
+ // FeTable object is extracted by the backend and passed when this object is
created
+ final private FeIcebergTable iceTbl_;
// Name of the metadata table
- private String metadataTableName_;
+ final private String metadataTableName_;
// Persist the file scan task iterator so we can continue after a RowBatch
is full
private CloseableIterator<FileScanTask> fileScanTaskIterator_;
@@ -58,18 +66,8 @@ public class IcebergMetadataScanner {
Preconditions.checkNotNull(iceTbl);
this.iceTbl_ = (FeIcebergTable) iceTbl;
this.metadataTableName_ = metadataTableName;
- }
-
- /**
- * Iterates over the {{fileScanTaskIterator_}} to find a {FileScanTask} that
has rows.
- */
- public boolean FindFileScanTaskWithRows() {
- while (fileScanTaskIterator_.hasNext()) {
- DataTask dataTask = (DataTask)fileScanTaskIterator_.next();
- dataRowsIterator_ = dataTask.rows().iterator();
- if (dataRowsIterator_.hasNext()) return true;
- }
- return false;
+ this.metadataTable_ = MetadataTableUtils.createMetadataTableInstance(
+ iceTbl_.getIcebergApiTable(),
MetadataTableType.valueOf(metadataTableName_));
}
/**
@@ -80,8 +78,7 @@ public class IcebergMetadataScanner {
*/
public void ScanMetadataTable() {
// Create and scan the metadata table
- metadataTable_ = MetadataTableUtils.createMetadataTableInstance(
- iceTbl_.getIcebergApiTable(),
MetadataTableType.valueOf(metadataTableName_));
+ LOG.trace("Metadata table schema: " + metadataTable_.schema().toString());
TableScan scan = metadataTable_.newScan();
// Init the FileScanTask iterator and DataRowsIterator
fileScanTaskIterator_ = scan.planFiles().iterator();
@@ -89,11 +86,15 @@ public class IcebergMetadataScanner {
}
/**
- * Returns the field {Accessor} for the specified field id. This {Accessor}
then is
- * used to access a field in the {StructLike} object.
+ * Iterates over the {{fileScanTaskIterator_}} to find a {FileScanTask} that
has rows.
*/
- public Accessor GetAccessor(int fieldId) {
- return metadataTable_.schema().accessorForField(fieldId);
+ private boolean FindFileScanTaskWithRows() {
+ while (fileScanTaskIterator_.hasNext()) {
+ DataTask dataTask = (DataTask)fileScanTaskIterator_.next();
+ dataRowsIterator_ = dataTask.rows().iterator();
+ if (dataRowsIterator_.hasNext()) return true;
+ }
+ return false;
}
/**
@@ -112,4 +113,39 @@ public class IcebergMetadataScanner {
return null;
}
+ /**
+ * Uses the Accessor to access the value in the StructLike object. This only
works for
+ * non collection types.
+ */
+ public Object GetValueByFieldId(StructLike structLike, int fieldId) {
+ Accessor accessor = metadataTable_.schema().accessorForField(fieldId);
+ return accessor.get(structLike);
+ }
+
+ /**
+ * Accesses the value inside the StructLike by its position.
+ */
+ public <T> T GetValueByPosition(StructLike structLike, int pos, Class<T>
javaClass)
+ {
+ return structLike.get(pos, javaClass);
+ }
+
+ /**
+ * Wrapper around an array that is the result of a metadata table scan.
+ * It is used to avoid iterating over a list through JNI.
+ */
+ public class ArrayScanner<T> {
+ private Iterator<T> iterator;
+
+ public ArrayScanner(List<T> array) {
+ this.iterator = array.iterator();
+ Preconditions.checkNotNull(iterator);
+ LOG.trace("Created metadata table array scanner, array size: " +
array.size());
+ }
+
+ public T GetNextArrayItem() {
+ if (iterator.hasNext()) return iterator.next();
+ return null;
+ }
+ }
}
diff --git
a/testdata/workloads/functional-query/queries/QueryTest/iceberg-metadata-tables.test
b/testdata/workloads/functional-query/queries/QueryTest/iceberg-metadata-tables.test
index 35141225c..9918adcd7 100644
---
a/testdata/workloads/functional-query/queries/QueryTest/iceberg-metadata-tables.test
+++
b/testdata/workloads/functional-query/queries/QueryTest/iceberg-metadata-tables.test
@@ -3,7 +3,7 @@
# from the manifests files that Iceberg adds. Therefore, the query results are
checked
# with regexp.
####
-# Test 0 : Query all the metadata tables once
+# Query all the metadata tables once
####
====
---- QUERY
@@ -176,7 +176,7 @@ row_regex:1,[1-9]\d*|0,[1-9]\d*|0,[1-9]\d*|0
INT,BIGINT,BIGINT,BIGINT
####
-# Test 1 : Test query empty table's metadata
+# Test query empty table's metadata
####
====
---- QUERY
@@ -229,7 +229,7 @@ select record_count + file_count from
functional_parquet.iceberg_query_metadata.
BIGINT
####
-# Test 3 : Test filtering
+# Test filtering
####
====
---- QUERY
@@ -317,7 +317,7 @@ BIGINT
====
####
-# Test 4 : Test joins
+# Test joins
####
====
---- QUERY
@@ -360,7 +360,7 @@ $OVERWRITE_SNAPSHOT_ID
BIGINT
####
-# Test 5 : Inline query
+# Inline query
####
====
---- QUERY
@@ -375,7 +375,7 @@ row_regex:[1-9]\d*|0
BIGINT
####
-# Test 6 : Complex types
+# Complex types
# Currently not supported, complex type slots are set to NULL (IMPALA-12205)
####
====
@@ -390,7 +390,7 @@ row_regex:[1-9]\d*|0,'NULL'
BIGINT,STRING
####
-# Test 7 : Multiple RowBatch results
+# Multiple RowBatch results
####
====
---- QUERY
@@ -405,7 +405,6 @@
row_regex:\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}(\.\d{9})?,[1-9]\d*|0,[1-9]\d*|0,t
TIMESTAMP,BIGINT,BIGINT,BOOLEAN
####
-# Test 8 : Timetravel
# Timetravel is not supported currently, related Jira IMPALA-11991.
####
====
@@ -446,7 +445,7 @@
row_regex:[1-9]\d*|0,'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/ice/i
INT,STRING,BIGINT
####
-# Test 10 : Invalid operations
+# Invalid operations
# In most cases the parser catches the table reference.
####
====
@@ -485,9 +484,39 @@ alter table
functional_parquet.iceberg_query_metadata.snapshots add columns (col
---- CATCH
ParseException: Syntax error in line 1
====
+---- QUERY
+select i from
functional_parquet.iceberg_query_metadata.entries.readable_metrics;
+---- CATCH
+AnalysisException: Illegal table reference to non-collection type:
'functional_parquet.iceberg_query_metadata.entries.readable_metrics'
+====
+---- QUERY
+select delete_ids.item
+from functional_parquet.iceberg_query_metadata.all_files,
functional_parquet.iceberg_query_metadata.all_files.equality_ids delete_ids;
+---- CATCH
+AnalysisException: Querying collection types (ARRAY/MAP) in FROM clause is not
supported for Iceberg Metadata tables.
+====
+---- QUERY
+select null_value_counts.key, null_value_counts.value
+from functional_parquet.iceberg_query_metadata.all_files,
functional_parquet.iceberg_query_metadata.all_files.null_value_counts
null_value_counts;
+---- CATCH
+AnalysisException: Querying collection types (ARRAY/MAP) in FROM clause is not
supported for Iceberg Metadata tables.
+====
+---- QUERY
+select item
+from functional_parquet.iceberg_query_metadata.all_files a, a.equality_ids e,
e.delete_ids;
+---- CATCH
+AnalysisException: Querying collection types (ARRAY/MAP) in FROM clause is not
supported for Iceberg Metadata tables.
+====
+---- QUERY
+create view iceberg_query_metadata_all_files
+as select equality_ids from
functional_parquet.iceberg_query_metadata.all_files;
+select item from iceberg_query_metadata_all_files a, a.equality_ids e,
e.delete_ids;
+---- CATCH
+AnalysisException: Querying collection types (ARRAY/MAP) in FROM clause is not
supported for Iceberg Metadata tables.
+====
####
-# Test 11 : Query nested type columns
+# Query nested type columns
####
====
---- QUERY
@@ -562,39 +591,128 @@
row_regex:'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_quer
---- TYPES
STRING,INT
====
+
+####
+# Query ARRAY type columns
+####
---- QUERY
-select i from
functional_parquet.iceberg_query_metadata.entries.readable_metrics;
----- CATCH
-AnalysisException: Illegal table reference to non-collection type:
'functional_parquet.iceberg_query_metadata.entries.readable_metrics'
+select equality_ids from
functional_parquet.iceberg_v2_delete_both_eq_and_pos.all_files;
+---- RESULTS
+'NULL'
+'[1,3]'
+'NULL'
+'NULL'
+'[1,3]'
+---- TYPES
+STRING
====
---- QUERY
-select delete_ids.item
-from functional_parquet.iceberg_query_metadata.all_files,
functional_parquet.iceberg_query_metadata.all_files.equality_ids delete_ids;
----- CATCH
-AnalysisException: Querying collection types (ARRAY/MAP) is not supported for
Iceberg Metadata tables. (IMPALA-12610, IMPALA-12611)
+select partition_summaries from
functional_parquet.iceberg_partitioned.all_manifests;
+---- RESULTS
+'[{"contains_null":false,"contains_nan":null,"lower_bound":"2020-01-01-08","upper_bound":"2020-01-01-10"},{"contains_null":false,"contains_nan":null,"lower_bound":"click","upper_bound":"view"}]'
+---- TYPES
+STRING
====
---- QUERY
-select null_value_counts.key, null_value_counts.value
-from functional_parquet.iceberg_query_metadata.all_files,
functional_parquet.iceberg_query_metadata.all_files.null_value_counts
null_value_counts;
----- CATCH
-AnalysisException: Querying collection types (ARRAY/MAP) is not supported for
Iceberg Metadata tables. (IMPALA-12610, IMPALA-12611)
+select file_path, content, equality_ids from
functional_parquet.iceberg_v2_delete_both_eq_and_pos.all_files
+order by file_path;
+---- RESULTS
+'/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_v2_delete_both_eq_and_pos/data/00000-0-38a471ff-46f4-4350-85cc-2e7ba946b34c-00001.parquet',0,'NULL'
+'/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_v2_delete_both_eq_and_pos/data/00000-0-38a471ff-46f4-4350-85cc-2e7ba946b34c-00002.parquet',2,'[1,3]'
+'/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_v2_delete_both_eq_and_pos/data/00000-0-72709aba-fb15-4bd6-9758-5f39eb9bdcb7-00001.parquet',0,'NULL'
+'/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_v2_delete_both_eq_and_pos/data/00000-0-72709aba-fb15-4bd6-9758-5f39eb9bdcb7-00002.parquet',2,'[1,3]'
+'/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_v2_delete_both_eq_and_pos/data/delete-074a9e19e61b766e-652a169e00000001_800513971_data.0.parq',1,'NULL'
+---- TYPES
+STRING,INT,STRING
====
---- QUERY
-select item
-from functional_parquet.iceberg_query_metadata.all_files a, a.equality_ids e,
e.delete_ids;
----- CATCH
-AnalysisException: Querying collection types (ARRAY/MAP) is not supported for
Iceberg Metadata tables. (IMPALA-12610, IMPALA-12611)
+select equality_ids, equality_ids from
functional_parquet.iceberg_v2_delete_both_eq_and_pos.all_files;
+---- RESULTS
+'NULL','NULL'
+'[1,3]','[1,3]'
+'NULL','NULL'
+'NULL','NULL'
+'[1,3]','[1,3]'
+---- TYPES
+STRING,STRING
====
---- QUERY
-create view iceberg_query_metadata_all_files
-as select equality_ids from
functional_parquet.iceberg_query_metadata.all_files;
-select item from iceberg_query_metadata_all_files a, a.equality_ids e,
e.delete_ids;
----- CATCH
-AnalysisException: Querying collection types (ARRAY/MAP) is not supported for
Iceberg Metadata tables. (IMPALA-12610, IMPALA-12611)
+select equality_ids from (select equality_ids from
functional_parquet.iceberg_v2_delete_both_eq_and_pos.all_files) s;
+---- RESULTS
+'NULL'
+'[1,3]'
+'NULL'
+'NULL'
+'[1,3]'
+---- TYPES
+STRING
+====
+---- QUERY
+with s as (select equality_ids from
functional_parquet.iceberg_v2_delete_both_eq_and_pos.all_files)
+select equality_ids from s;
+---- RESULTS
+'NULL'
+'[1,3]'
+'NULL'
+'NULL'
+'[1,3]'
+---- TYPES
+STRING
====
+---- QUERY
+# The following three queries are for specific use-cases:
+# Source:
https://www.apachecon.com/acna2022/slides/02_Ho_Icebergs_Best_Secret.pdf
+# How many files per partition:
+SELECT `partition`, file_count FROM
functional_parquet.iceberg_partitioned.`partitions`;
+---- RESULTS
+'{"event_time_hour":438296,"action":"view"}',8
+'{"event_time_hour":438297,"action":"click"}',6
+'{"event_time_hour":438298,"action":"download"}',6
+---- TYPES
+STRING,INT
+====
+---- QUERY
+# Total size of each partition:
+SELECT `partition`.event_time_hour as event_time_hour, `partition`.action as
action, sum(file_size_in_bytes)
+FROM functional_parquet.iceberg_partitioned.`files`
+GROUP BY event_time_hour, action;
+---- RESULTS
+438296,'view',9296
+438298,'download',7139
+438297,'click',7013
+---- TYPES
+INT,STRING,BIGINT
+====
+---- QUERY
+# Last update time per partition
+SELECT e.data_file.`partition`.event_time_hour as event_time_hour,
e.data_file.`partition`.action as action, MAX(s.committed_at) AS
last_modified_time
+FROM functional_parquet.iceberg_partitioned.snapshots s
+JOIN functional_parquet.iceberg_partitioned.entries e
+WHERE s.snapshot_id = e.snapshot_id
+GROUP BY event_time_hour, action;
+---- RESULTS
+438298,'download',2020-08-31 05:58:08.440000000
+438297,'click',2020-08-31 05:58:08.440000000
+438296,'view',2020-08-31 05:58:08.440000000
+---- TYPES
+INT,STRING,TIMESTAMP
+====
+
+####
+# Query MAP type columns
+####
+---- QUERY
+select null_value_counts from
functional_parquet.iceberg_query_metadata.all_files;
+---- RESULTS
+'NULL'
+'NULL'
+'NULL'
+'NULL'
+---- TYPES
+STRING
####
-# Test 12 : Describe all the metadata tables once
+# Describe all the metadata tables once
####
====
---- QUERY