This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new a8418ccd2c4 [fix](iceberg)The manifest file needs to read the "exists"
type. (#50434)
a8418ccd2c4 is described below
commit a8418ccd2c4fc0c9eeb571d5d394c896988c907b
Author: wuwenchi <[email protected]>
AuthorDate: Tue Apr 29 16:21:34 2025 +0800
[fix](iceberg)The manifest file needs to read the "exists" type. (#50434)
### What problem does this PR solve?
related pr: #49489
Problem Summary:
The manifest file needs to read the "exists" type.
---
.../datasource/iceberg/source/IcebergScanNode.java | 16 +-
.../iceberg/source/IcebergScanNodeTest.java | 181 +++++++++++++++++++++
2 files changed, 194 insertions(+), 3 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
index a634aa6c91f..8707f4b06b5 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
@@ -48,6 +48,7 @@ import org.apache.doris.thrift.TPlanNode;
import org.apache.doris.thrift.TPushAggOp;
import org.apache.doris.thrift.TTableFormatFileDesc;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.apache.commons.lang3.exception.ExceptionUtils;
@@ -102,6 +103,12 @@ public class IcebergScanNode extends FileQueryScanNode {
private PreExecutionAuthenticator preExecutionAuthenticator;
private TableScan icebergTableScan;
+ // for test
+ @VisibleForTesting
+ public IcebergScanNode(PlanNodeId id, TupleDescriptor desc,
SessionVariable sv) {
+ super(id, desc, "ICEBERG_SCAN_NODE",
StatisticalType.ICEBERG_SCAN_NODE, false, sv);
+ }
+
/**
* External file scan node for Query iceberg table
* needCheckColumnPriv: Some of ExternalFileScanNode do not need to check
column priv
@@ -249,7 +256,8 @@ public class IcebergScanNode extends FileQueryScanNode {
});
}
- private TableScan createTableScan() {
+ @VisibleForTesting
+ public TableScan createTableScan() {
if (icebergTableScan != null) {
return icebergTableScan;
}
@@ -368,7 +376,8 @@ public class IcebergScanNode extends FileQueryScanNode {
createTableScan().filter()).iterator()) {
int cnt = 0;
while (matchingManifest.hasNext()) {
- cnt += matchingManifest.next().addedFilesCount();
+ ManifestFile next = matchingManifest.next();
+ cnt += next.addedFilesCount() +
next.existingFilesCount();
if (cnt >= sessionVariable.getNumFilesInBatchMode()) {
return true;
}
@@ -470,7 +479,8 @@ public class IcebergScanNode extends FileQueryScanNode {
return !col.isAllowNull();
}
- private long getCountFromSnapshot() {
+ @VisibleForTesting
+ public long getCountFromSnapshot() {
Long specifiedSnapshot = getSpecifiedSnapshot();
Snapshot snapshot = specifiedSnapshot == null
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/source/IcebergScanNodeTest.java
b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/source/IcebergScanNodeTest.java
new file mode 100644
index 00000000000..8ae51a61f46
--- /dev/null
+++
b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/source/IcebergScanNodeTest.java
@@ -0,0 +1,181 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.iceberg.source;
+
+import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.analysis.TupleId;
+import org.apache.doris.common.jmockit.Deencapsulation;
+import
org.apache.doris.common.security.authentication.PreExecutionAuthenticator;
+import org.apache.doris.datasource.iceberg.IcebergUtils;
+import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.qe.SessionVariable;
+import org.apache.doris.thrift.TPushAggOp;
+
+import com.google.common.collect.Lists;
+import mockit.Expectations;
+import mockit.Mock;
+import mockit.MockUp;
+import mockit.Mocked;
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.GenericManifestFile;
+import org.apache.iceberg.ManifestContent;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.TableScan;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.hadoop.HadoopTableOperations;
+import org.apache.iceberg.io.CloseableIterable;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class IcebergScanNodeTest {
+
+ @Mocked
+ HadoopTableOperations hadoopTableOperations;
+ @Mocked
+ Snapshot snapshot;
+
+ @Test
+ public void testIsBatchMode() {
+ SessionVariable sessionVariable = new SessionVariable();
+ IcebergScanNode icebergScanNode = new IcebergScanNode(new
PlanNodeId(1), new TupleDescriptor(new TupleId(1)), sessionVariable);
+
+ new Expectations(icebergScanNode) {{
+ icebergScanNode.getPushDownAggNoGroupingOp();
+ result = TPushAggOp.COUNT;
+ icebergScanNode.getCountFromSnapshot();
+ result = 1L;
+ }};
+ Assert.assertFalse(icebergScanNode.isBatchMode());
+
+ BaseTable mockTable = new BaseTable(hadoopTableOperations,
"mockTable");
+ new Expectations(icebergScanNode) {{
+ icebergScanNode.getPushDownAggNoGroupingOp();
+ result = TPushAggOp.NONE;
+ Deencapsulation.setField(icebergScanNode, "icebergTable",
mockTable);
+ }};
+ TableScan tableScan = mockTable.newScan();
+ new Expectations(mockTable) {{
+ mockTable.currentSnapshot();
+ result = null;
+ icebergScanNode.createTableScan();
+ result = tableScan;
+ }};
+ Assert.assertFalse(icebergScanNode.isBatchMode());
+
+ new Expectations(mockTable) {{
+ mockTable.currentSnapshot();
+ result = snapshot;
+ }};
+ new Expectations(sessionVariable) {{
+ sessionVariable.getEnableExternalTableBatchMode();
+ result = false;
+ }};
+ Assert.assertFalse(icebergScanNode.isBatchMode());
+
+
+ new Expectations(sessionVariable) {{
+ sessionVariable.getEnableExternalTableBatchMode();
+ result = true;
+ }};
+ new Expectations(icebergScanNode) {{
+ Deencapsulation.setField(icebergScanNode,
"preExecutionAuthenticator", new PreExecutionAuthenticator());
+ }};
+ new Expectations() {{
+ sessionVariable.getNumFilesInBatchMode();
+ result = 1024;
+ }};
+
+ mockManifestFile("p", 10, 0);
+ Assert.assertFalse(icebergScanNode.isBatchMode());
+
+ mockManifestFile("p", 0, 10);
+ Assert.assertFalse(icebergScanNode.isBatchMode());
+
+ mockManifestFile("p", 10, 10);
+ Assert.assertFalse(icebergScanNode.isBatchMode());
+
+ mockManifestFile("p", 1024, 0);
+ Assert.assertTrue(icebergScanNode.isBatchMode());
+
+ mockManifestFile("p", 0, 1024);
+ Assert.assertTrue(icebergScanNode.isBatchMode());
+
+ new Expectations() {{
+ sessionVariable.getNumFilesInBatchMode();
+ result = 100;
+ }};
+
+ mockManifestFile("p", 10, 0);
+ Assert.assertFalse(icebergScanNode.isBatchMode());
+
+ mockManifestFile("p", 0, 10);
+ Assert.assertFalse(icebergScanNode.isBatchMode());
+
+ mockManifestFile("p", 10, 10);
+ Assert.assertFalse(icebergScanNode.isBatchMode());
+
+ mockManifestFile("p", 0, 100);
+ Assert.assertTrue(icebergScanNode.isBatchMode());
+
+ mockManifestFile("p", 100, 0);
+ Assert.assertTrue(icebergScanNode.isBatchMode());
+
+ mockManifestFile("p", 10, 90);
+ Assert.assertTrue(icebergScanNode.isBatchMode());
+ }
+
+ private void mockManifestFile(String path, int addedFileCount, int
existingFileCount) {
+ new MockUp<IcebergUtils>() {
+ @Mock
+ CloseableIterable<ManifestFile>
getMatchingManifest(List<ManifestFile> dataManifests,
+ Map<Integer,
PartitionSpec> specsById,
+ Expression
dataFilte) {
+ return CloseableIterable.withNoopClose(new
ArrayList<ManifestFile>() {{
+ add(genManifestFile(path, addedFileCount,
existingFileCount));
+ }}
+ );
+ }
+ };
+ }
+
+ private ManifestFile genManifestFile(String path, int addedFileCount, int
existingFileCount) {
+ return new GenericManifestFile(
+ path,
+ 10, // length
+ 1, // specId
+ ManifestContent.DATA,
+ 1, // sequenceNumber
+ 1, // minSeqNumber
+ 1L, // snapshotid
+ addedFileCount,
+ 1,
+ existingFileCount,
+ 1,
+ 0, // deleteFilesCount
+ 0,
+ Lists.newArrayList(),
+ null
+ );
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]