This is an automated email from the ASF dual-hosted git repository.
szita pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 8b48ce50684 HIVE-26697: Create vectorized Hive DeleteFilter for
Iceberg tables (#3747) (Adam Szita, reviewed by Laszlo Pinter)
8b48ce50684 is described below
commit 8b48ce50684791d53dcb942958de233263cc421f
Author: Adam Szita <[email protected]>
AuthorDate: Fri Nov 11 16:30:57 2022 +0100
HIVE-26697: Create vectorized Hive DeleteFilter for Iceberg tables (#3747)
(Adam Szita, reviewed by Laszlo Pinter)
---
.../iceberg/mr/hive/HiveIcebergStorageHandler.java | 4 +-
.../iceberg/mr/hive/vector/HiveDeleteFilter.java | 178 +++++++++++++++++++++
.../mr/hive/vector/HiveVectorizedReader.java | 37 ++++-
.../iceberg/mr/mapreduce/IcebergInputFormat.java | 8 +-
.../org/apache/iceberg/mr/hive/TestTables.java | 2 +-
.../hive/vector/TestHiveIcebergVectorization.java | 62 +++++++
.../test/results/positive/merge_iceberg_orc.q.out | 4 +
.../positive/merge_iceberg_partitioned_orc.q.out | 6 +
8 files changed, 291 insertions(+), 10 deletions(-)
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
index 91439beec3b..ada8eeb2b9e 100644
---
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
@@ -908,7 +908,6 @@ public class HiveIcebergStorageHandler implements
HiveStoragePredicateHandler, H
/**
* If any of the following checks is true we fall back to non vectorized
mode:
* <ul>
- * <li>iceberg format-version is "2"</li>
* <li>fileformat is set to avro</li>
* <li>querying metadata tables</li>
* <li>fileformat is set to ORC, and table schema has time type column</li>
@@ -918,8 +917,7 @@ public class HiveIcebergStorageHandler implements
HiveStoragePredicateHandler, H
*/
private void fallbackToNonVectorizedModeBasedOnProperties(Properties
tableProps) {
Schema tableSchema =
SchemaParser.fromJson(tableProps.getProperty(InputFormatConfig.TABLE_SCHEMA));
- if ("2".equals(tableProps.get(TableProperties.FORMAT_VERSION)) ||
-
FileFormat.AVRO.name().equalsIgnoreCase(tableProps.getProperty(TableProperties.DEFAULT_FILE_FORMAT))
||
+ if
(FileFormat.AVRO.name().equalsIgnoreCase(tableProps.getProperty(TableProperties.DEFAULT_FILE_FORMAT))
||
(tableProps.containsKey("metaTable") &&
isValidMetadataTable(tableProps.getProperty("metaTable"))) ||
hasOrcTimeInSchema(tableProps, tableSchema) ||
!hasParquetNestedTypeWithinListOrMap(tableProps, tableSchema)) {
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveDeleteFilter.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveDeleteFilter.java
new file mode 100644
index 00000000000..a87470ae4d9
--- /dev/null
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveDeleteFilter.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.hive.vector;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.NoSuchElementException;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.data.DeleteFilter;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+
+/**
+ * Delete filter implementation which is consuming HiveRow instances.
+ */
+public class HiveDeleteFilter extends DeleteFilter<HiveRow> {
+
+ private final FileIO io;
+ private final HiveStructLike asStructLike;
+
+ public HiveDeleteFilter(FileIO io, FileScanTask task, Schema tableSchema,
Schema requestedSchema) {
+ super((task.file()).path().toString(), task.deletes(), tableSchema,
requestedSchema);
+ this.io = io;
+ this.asStructLike = new HiveStructLike(this.requiredSchema().asStruct());
+ }
+
+ @Override
+ protected StructLike asStructLike(HiveRow record) {
+ return asStructLike.wrap(record);
+ }
+
+ @Override
+ protected long pos(HiveRow record) {
+ return (long) record.get(MetadataColumns.ROW_POSITION.fieldId());
+ }
+
+ @Override
+ protected void markRowDeleted(HiveRow row) {
+ row.setDeleted(true);
+ }
+
+ @Override
+ protected InputFile getInputFile(String location) {
+ return this.io.newInputFile(location);
+ }
+
+ /**
+ * Adjusts the pipeline of incoming VRBs so that for each batch every row
goes through the delete filter.
+ * @param batches iterable of HiveBatchContexts i.e. VRBs and their meta
information
+ * @return the adjusted iterable of HiveBatchContexts
+ */
+ public CloseableIterable<HiveBatchContext>
filterBatch(CloseableIterable<HiveBatchContext> batches) {
+
+ // Delete filter pipeline setup logic:
+ // A HiveRow iterable (deleteInputIterable) is provided as input iterable
for the DeleteFilter.
+ // The content in deleteInputIterable is provided by row iterators from
the incoming VRBs i.e. on the arrival of
+ // a new batch the underlying iterator gets swapped.
+ SwappableHiveRowIterable deleteInputIterable = new
SwappableHiveRowIterable();
+
+ // Output iterable of DeleteFilter, and its iterator
+ CloseableIterable<HiveRow> deleteOutputIterable =
filter(deleteInputIterable);
+ CloseableIterator<HiveRow> deleteOutputIterator =
deleteOutputIterable.iterator();
+
+ return new CloseableIterable<HiveBatchContext>() {
+
+ @Override
+ public CloseableIterator<HiveBatchContext> iterator() {
+
+ CloseableIterator<HiveBatchContext> srcIterator = batches.iterator();
+
+ return new CloseableIterator<HiveBatchContext>() {
+
+ @Override
+ public boolean hasNext() {
+ return srcIterator.hasNext();
+ }
+
+ @Override
+ public HiveBatchContext next() {
+ try {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ HiveBatchContext currentBatchContext = srcIterator.next();
+ deleteInputIterable.currentRowIterator =
currentBatchContext.rowIterator();
+ VectorizedRowBatch batch = currentBatchContext.getBatch();
+
+ int oldSize = batch.size;
+ int newSize = 0;
+
+ // Apply delete filtering and adjust the selected array so that
undeleted row indices are filled with it.
+ while (deleteOutputIterator.hasNext()) {
+ HiveRow row = deleteOutputIterator.next();
+ if (!row.isDeleted()) {
+ batch.selected[newSize++] = row.physicalBatchIndex();
+ }
+ }
+
+ if (newSize < oldSize) {
+ batch.size = newSize;
+ batch.selectedInUse = true;
+ }
+ return currentBatchContext;
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ srcIterator.close();
+ }
+ };
+ }
+
+ @Override
+ public void close() throws IOException {
+ batches.close();
+ }
+ };
+ }
+
+ // HiveRow iterable that wraps an interchangeable source HiveRow iterable
+ static class SwappableHiveRowIterable implements CloseableIterable<HiveRow> {
+
+ private CloseableIterator<HiveRow> currentRowIterator;
+
+ @Override
+ public CloseableIterator<HiveRow> iterator() {
+
+ return new CloseableIterator<HiveRow>() {
+
+ @Override
+ public boolean hasNext() {
+ return currentRowIterator.hasNext();
+ }
+
+ @Override
+ public HiveRow next() {
+ return currentRowIterator.next();
+ }
+
+ @Override
+ public void close() throws IOException {
+ currentRowIterator.close();
+ }
+ };
+ }
+
+ @Override
+ public void close() throws IOException {
+ currentRowIterator.close();
+ }
+ }
+}
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveVectorizedReader.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveVectorizedReader.java
index 22131062ddf..8eccb4f1468 100644
---
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveVectorizedReader.java
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveVectorizedReader.java
@@ -47,11 +47,14 @@ import
org.apache.hive.iceberg.org.apache.parquet.format.converter.ParquetMetada
import org.apache.hive.iceberg.org.apache.parquet.hadoop.ParquetFileReader;
import
org.apache.hive.iceberg.org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.hive.iceberg.org.apache.parquet.schema.MessageType;
+import org.apache.iceberg.FileContent;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
@@ -76,8 +79,22 @@ public class HiveVectorizedReader {
}
- public static CloseableIterable<HiveBatchContext> reader(Path path,
FileScanTask task,
- Map<Integer, ?> idToConstant, TaskAttemptContext context, Expression
residual) {
+ public static CloseableIterable<HiveBatchContext> reader(Table table, Path
path, FileScanTask task,
+ Map<Integer, ?> idToConstant, TaskAttemptContext context, Expression
residual, Schema readSchema) {
+
+ HiveDeleteFilter deleteFilter = null;
+ Schema requiredSchema = readSchema;
+
+ if (!task.deletes().isEmpty()) {
+ deleteFilter = new HiveDeleteFilter(table.io(), task, table.schema(),
prepareSchemaForDeleteFilter(readSchema));
+ requiredSchema = deleteFilter.requiredSchema();
+ // TODO: take requiredSchema and adjust readColumnIds below accordingly
for equality delete cases
+ // and remove below limitation
+ if (task.deletes().stream().anyMatch(d -> d.content() ==
FileContent.EQUALITY_DELETES)) {
+ throw new UnsupportedOperationException("Vectorized reading with
equality deletes is not supported yet.");
+ }
+ }
+
// Tweaks on jobConf here are relevant for this task only, so we need to
copy it first as context's conf is reused..
JobConf job = new JobConf(context.getConfiguration());
@@ -147,7 +164,10 @@ public class HiveVectorizedReader {
throw new UnsupportedOperationException("Vectorized Hive reading
unimplemented for format: " + format);
}
- return createVectorizedRowBatchIterable(recordReader, job,
partitionColIndices, partitionValues);
+ CloseableIterable<HiveBatchContext> vrbIterable =
+ createVectorizedRowBatchIterable(recordReader, job,
partitionColIndices, partitionValues);
+
+ return deleteFilter != null ? deleteFilter.filterBatch(vrbIterable) :
vrbIterable;
} catch (IOException ioe) {
throw new RuntimeException("Error creating vectorized record reader for
" + path, ioe);
@@ -243,4 +263,15 @@ public class HiveVectorizedReader {
};
}
+ /**
+ * We need to add IS_DELETED metadata field so that DeleteFilter marks
deleted rows rather than filering them out.
+ * @param schema original schema
+ * @return adjusted schema
+ */
+ private static Schema prepareSchemaForDeleteFilter(Schema schema) {
+ List<Types.NestedField> columns = Lists.newArrayList(schema.columns());
+ columns.add(MetadataColumns.IS_DELETED);
+ return new Schema(columns);
+ }
+
}
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
index c60782c6fb7..ae3b31563e8 100644
---
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
@@ -221,11 +221,13 @@ public class IcebergInputFormat<T> extends
InputFormat<Void, T> {
if (MetastoreUtil.hive3PresentOnClasspath()) {
HIVE_VECTORIZED_READER_BUILDER = DynMethods.builder("reader")
.impl(HIVE_VECTORIZED_READER_CLASS,
+ Table.class,
Path.class,
FileScanTask.class,
Map.class,
TaskAttemptContext.class,
- Expression.class)
+ Expression.class,
+ Schema.class)
.buildStatic();
} else {
HIVE_VECTORIZED_READER_BUILDER = null;
@@ -326,8 +328,8 @@ public class IcebergInputFormat<T> extends
InputFormat<Void, T> {
Expression residual = HiveIcebergInputFormat.residualForTask(task,
context.getConfiguration());
// TODO: We have to take care of the EncryptionManager when LLAP and
vectorization is used
- CloseableIterable<T> iterator =
HIVE_VECTORIZED_READER_BUILDER.invoke(path, task,
- idToConstant, context, residual);
+ CloseableIterable<T> iterator =
HIVE_VECTORIZED_READER_BUILDER.invoke(table, path, task,
+ idToConstant, context, residual, readSchema);
return applyResidualFiltering(iterator, residual, readSchema);
}
diff --git
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestTables.java
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestTables.java
index 0f0f48911e5..c988fa88fd2 100644
---
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestTables.java
+++
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestTables.java
@@ -638,7 +638,7 @@ public abstract class TestTables {
}
}
- enum TestTableType {
+ public enum TestTableType {
HADOOP_TABLE {
@Override
public TestTables instance(Configuration conf, TemporaryFolder
temporaryFolder, String catalogName) {
diff --git
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/vector/TestHiveIcebergVectorization.java
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/vector/TestHiveIcebergVectorization.java
index 6bc7074a87c..34fc0673153 100644
---
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/vector/TestHiveIcebergVectorization.java
+++
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/vector/TestHiveIcebergVectorization.java
@@ -21,6 +21,7 @@ package org.apache.iceberg.mr.hive.vector;
import java.util.Iterator;
import java.util.List;
+import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
@@ -42,16 +43,19 @@ import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.mr.TestHelper;
import org.apache.iceberg.mr.hive.HiveIcebergStorageHandlerWithEngineBase;
+import org.apache.iceberg.mr.hive.TestTables;
import org.apache.iceberg.mr.hive.serde.objectinspector.IcebergObjectInspector;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Types;
import org.junit.Assert;
+import org.junit.Assume;
import org.junit.Test;
import static org.apache.iceberg.types.Types.NestedField.optional;
@@ -126,6 +130,64 @@ public class TestHiveIcebergVectorization extends
HiveIcebergStorageHandlerWithE
Assert.assertEquals(genericRowIterator.hasNext(),
hiveRowIterator.hasNext());
}
+ /**
+ * Tests HiveDeleteFilter implementation correctly filtering rows from VRBs.
+ */
+ @Test
+ public void testHiveDeleteFilter() {
+ // The Avro "vectorized" case should actually serve as compareTo scenario
to non-vectorized reading, because
+ // there's no vectorization for Avro and it falls back to the
non-vectorized implementation
+ Assume.assumeTrue(isVectorized && testTableType ==
TestTables.TestTableType.HIVE_CATALOG);
+
+ // Minimal schema to minimize resource footprint of what's coming next...
+ Schema schema = new Schema(
+ optional(1, "customer_id", Types.LongType.get()),
+ optional(2, "customer_age", Types.IntegerType.get())
+ );
+
+ // Generate 106000 records so that we end up with multiple (104) batches
to work with during the read.
+ // Sadly there's no way to control
DeleteFilter.DEFAULT_SET_FILTER_THRESHOLD from this test, so we need two
+ // select statements: one where we have less than
DEFAULT_SET_FILTER_THRESHOLD deletes, & one where there's more.
+ // This should test both in-memory and streaming delete application
implementations of Iceberg.
+ List<Record> records = TestHelper.generateRandomRecords(schema, 106000,
0L);
+
+ // Fill id column with deterministic values
+ for (int i = 0; i < records.size(); ++i) {
+ records.get(i).setField("customer_id", (long) i);
+ }
+ testTables.createTable(shell, "vectordelete", schema,
+ PartitionSpec.unpartitioned(), fileFormat, records, 2);
+
+ // Delete every odd row until 6000
+ shell.executeStatement("DELETE FROM vectordelete WHERE customer_id % 2 = 1
and customer_id < 6000");
+
+ // Delete a whole batch's worth of data overlapping into the previous and
next partial batches (batch size is 1024)
+ shell.executeStatement("DELETE FROM vectordelete WHERE 1000 < customer_id
and customer_id < 3000");
+
+ Function<Integer, Void> validation = expectedCount -> {
+ List<Object[]> result = shell.executeStatement("select * from
vectordelete where customer_id < 6000");
+ Assert.assertEquals(expectedCount.intValue(), result.size());
+
+ for (Object[] row : result) {
+ long id = (long) row[0];
+ Assert.assertTrue("Found row with odd customer_id", id % 2 == 0);
+ Assert.assertTrue("Found a row with customer_id between 1000 and 3000
(both exclusive)",
+ id <= 1000 || 3000 <= id);
+ Assert.assertTrue("Found a row with customer_id >= 6000, i.e. where
clause is not in effect.", id < 6000);
+ }
+
+ return null;
+ };
+
+ // Test with all deletes loaded into memory (DeletePositionIndex), as we
have only 3999 deletes in 2 delete files
+ validation.apply(2001);
+
+ // Test with streamed reading of deletes and data rows as we have 104499
deletes in 3 delete files with:
+ shell.executeStatement("DELETE FROM vectordelete WHERE customer_id >=
5000");
+ // 500 fewer rows as the above statement removed all even rows between
5000 and 6000 that were there previously
+ validation.apply(1501);
+ }
+
/**
* Creates a mock vectorized ORC read job for a particular data file and a
read schema (projecting on all columns)
* @param schema readSchema
diff --git
a/iceberg/iceberg-handler/src/test/results/positive/merge_iceberg_orc.q.out
b/iceberg/iceberg-handler/src/test/results/positive/merge_iceberg_orc.q.out
index 8f314fefcbb..fe94c2b6bc1 100644
--- a/iceberg/iceberg-handler/src/test/results/positive/merge_iceberg_orc.q.out
+++ b/iceberg/iceberg-handler/src/test/results/positive/merge_iceberg_orc.q.out
@@ -90,6 +90,7 @@ STAGE PLANS:
Map-reduce partition columns: _col0 (type: int)
Statistics: Num rows: 6 Data size: 576 Basic stats:
COMPLETE Column stats: COMPLETE
value expressions: _col1 (type: string), _col2 (type:
int)
+ Execution mode: vectorized
Map 6
Map Operator Tree:
TableScan
@@ -202,6 +203,7 @@ STAGE PLANS:
Statistics: Num rows: 4 Data size: 644 Basic stats:
COMPLETE Column stats: COMPLETE
value expressions: _col4 (type: bigint)
Reducer 3
+ Execution mode: vectorized
Reduce Operator Tree:
Select Operator
expressions: KEY.reducesinkkey0 (type: int),
KEY.reducesinkkey1 (type: bigint), KEY.reducesinkkey2 (type: string),
KEY.reducesinkkey3 (type: bigint), VALUE._col0 (type: int), VALUE._col1 (type:
string), VALUE._col2 (type: int)
@@ -216,6 +218,7 @@ STAGE PLANS:
serde: org.apache.iceberg.mr.hive.HiveIcebergSerDe
name: default.target_ice
Reducer 4
+ Execution mode: vectorized
Reduce Operator Tree:
Select Operator
expressions: KEY.reducesinkkey0 (type: int),
KEY.reducesinkkey1 (type: bigint), KEY.reducesinkkey2 (type: string),
KEY.reducesinkkey3 (type: bigint), VALUE._col0 (type: int), VALUE._col1 (type:
string), VALUE._col2 (type: int)
@@ -230,6 +233,7 @@ STAGE PLANS:
serde: org.apache.iceberg.mr.hive.HiveIcebergSerDe
name: default.target_ice
Reducer 5
+ Execution mode: vectorized
Reduce Operator Tree:
Group By Operator
aggregations: count(VALUE._col0)
diff --git
a/iceberg/iceberg-handler/src/test/results/positive/merge_iceberg_partitioned_orc.q.out
b/iceberg/iceberg-handler/src/test/results/positive/merge_iceberg_partitioned_orc.q.out
index 0ae5eda9c79..d1778673d08 100644
---
a/iceberg/iceberg-handler/src/test/results/positive/merge_iceberg_partitioned_orc.q.out
+++
b/iceberg/iceberg-handler/src/test/results/positive/merge_iceberg_partitioned_orc.q.out
@@ -92,6 +92,7 @@ STAGE PLANS:
Map-reduce partition columns: _col0 (type: int)
Statistics: Num rows: 6 Data size: 576 Basic stats:
COMPLETE Column stats: COMPLETE
value expressions: _col1 (type: string), _col2 (type:
int)
+ Execution mode: vectorized
Map 8
Map Operator Tree:
TableScan
@@ -202,6 +203,7 @@ STAGE PLANS:
Statistics: Num rows: 4 Data size: 644 Basic stats:
COMPLETE Column stats: COMPLETE
value expressions: _col4 (type: bigint)
Reducer 3
+ Execution mode: vectorized
Reduce Operator Tree:
Select Operator
expressions: KEY.reducesinkkey0 (type: int),
KEY.reducesinkkey1 (type: bigint), KEY.reducesinkkey2 (type: string),
KEY.reducesinkkey3 (type: bigint), VALUE._col0 (type: int), VALUE._col1 (type:
string), VALUE._col2 (type: int)
@@ -216,6 +218,7 @@ STAGE PLANS:
serde: org.apache.iceberg.mr.hive.HiveIcebergSerDe
name: default.target_ice
Reducer 4
+ Execution mode: vectorized
Reduce Operator Tree:
Select Operator
expressions: KEY.reducesinkkey0 (type: int),
KEY.reducesinkkey1 (type: bigint), KEY.reducesinkkey2 (type: string),
KEY.reducesinkkey3 (type: bigint), VALUE._col0 (type: int), VALUE._col1 (type:
string), VALUE._col2 (type: int)
@@ -230,6 +233,7 @@ STAGE PLANS:
serde: org.apache.iceberg.mr.hive.HiveIcebergSerDe
name: default.target_ice
Reducer 5
+ Execution mode: vectorized
Reduce Operator Tree:
Select Operator
expressions: VALUE._col0 (type: int), VALUE._col1 (type:
string), VALUE._col2 (type: int), KEY.iceberg_bucket(_col0, 16) (type: int),
KEY._col1 (type: string)
@@ -244,6 +248,7 @@ STAGE PLANS:
serde: org.apache.iceberg.mr.hive.HiveIcebergSerDe
name: default.target_ice
Reducer 6
+ Execution mode: vectorized
Reduce Operator Tree:
Select Operator
expressions: VALUE._col0 (type: int), VALUE._col1 (type:
string), VALUE._col2 (type: int), KEY.iceberg_bucket(_col0, 16) (type: int),
KEY._col1 (type: string)
@@ -258,6 +263,7 @@ STAGE PLANS:
serde: org.apache.iceberg.mr.hive.HiveIcebergSerDe
name: default.target_ice
Reducer 7
+ Execution mode: vectorized
Reduce Operator Tree:
Group By Operator
aggregations: count(VALUE._col0)