This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new 2708ee43540 [fix](paimon) use SlotDescriptor to parse the required 
fields #28990 (#29053)
2708ee43540 is described below

commit 2708ee4354035a7d67f082234d37960a7b235bcd
Author: Ashin Gau <[email protected]>
AuthorDate: Wed Dec 27 17:44:14 2023 +0800

    [fix](paimon) use SlotDescriptor to parse the required fields #28990 
(#29053)
    
    backport: #28990
---
 be/src/vec/exec/format/table/paimon_reader.cpp     |  7 +++-
 .../apache/doris/common/jni/vec/VectorColumn.java  |  9 +++++
 .../org/apache/doris/paimon/PaimonJniScanner.java  | 44 +++++++++-------------
 3 files changed, 32 insertions(+), 28 deletions(-)

diff --git a/be/src/vec/exec/format/table/paimon_reader.cpp 
b/be/src/vec/exec/format/table/paimon_reader.cpp
index f1ebb96fa10..3a2d3745047 100644
--- a/be/src/vec/exec/format/table/paimon_reader.cpp
+++ b/be/src/vec/exec/format/table/paimon_reader.cpp
@@ -42,9 +42,10 @@ PaimonJniReader::PaimonJniReader(const 
std::vector<SlotDescriptor*>& file_slot_d
                                  const TFileRangeDesc& range)
         : _file_slot_descs(file_slot_descs), _state(state), _profile(profile) {
     std::vector<std::string> column_names;
+    std::vector<std::string> column_types;
     for (auto& desc : _file_slot_descs) {
-        std::string field = desc->col_name();
-        column_names.emplace_back(field);
+        column_names.emplace_back(desc->col_name());
+        column_types.emplace_back(JniConnector::get_hive_type(desc->type()));
     }
     std::map<String, String> params;
     params["db_name"] = range.table_format_params.paimon_params.db_name;
@@ -57,6 +58,8 @@ PaimonJniReader::PaimonJniReader(const 
std::vector<SlotDescriptor*>& file_slot_d
     params["tbl_id"] = 
std::to_string(range.table_format_params.paimon_params.tbl_id);
     params["last_update_time"] =
             
std::to_string(range.table_format_params.paimon_params.last_update_time);
+    params["required_fields"] = join(column_names, ",");
+    params["columns_types"] = join(column_types, "#");
 
     // Used to create paimon option
     for (auto& kv : range.table_format_params.paimon_params.paimon_options) {
diff --git 
a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/vec/VectorColumn.java
 
b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/vec/VectorColumn.java
index 381422931d4..f79951b4570 100644
--- 
a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/vec/VectorColumn.java
+++ 
b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/vec/VectorColumn.java
@@ -563,6 +563,9 @@ public class VectorColumn {
     public int appendBytesAndOffset(byte[] src, int offset, int length) {
         int startOffset = childColumns[0].appendBytes(src, offset, length);
         reserve(appendIndex + 1);
+        if (startOffset + length < 0) {
+            throw new RuntimeException("String overflow, offset=" + 
startOffset + ", length=" + length);
+        }
         OffHeap.putInt(null, offsets + 4L * appendIndex, startOffset + length);
         return appendIndex++;
     }
@@ -591,6 +594,9 @@ public class VectorColumn {
             childColumns[0].appendValue(v);
         }
         reserve(appendIndex + 1);
+        if (startOffset + length < 0) {
+            throw new RuntimeException("Array overflow, offset=" + startOffset 
+ ", length=" + length);
+        }
         OffHeap.putLong(null, offsets + 8L * appendIndex, startOffset + 
length);
         return appendIndex++;
     }
@@ -605,6 +611,9 @@ public class VectorColumn {
             childColumns[1].appendValue(v);
         }
         reserve(appendIndex + 1);
+        if (startOffset + length < 0) {
+            throw new RuntimeException("Map overflow, offset=" + startOffset + 
", length=" + length);
+        }
         OffHeap.putLong(null, offsets + 8L * appendIndex, startOffset + 
length);
         return appendIndex++;
     }
diff --git 
a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java
 
b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java
index 849f6fb67bb..be1505c4ef1 100644
--- 
a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java
+++ 
b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java
@@ -30,7 +30,6 @@ import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.table.source.ReadBuilder;
 import org.apache.paimon.table.source.Split;
-import org.apache.paimon.types.DataType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -43,6 +42,7 @@ import java.util.stream.Collectors;
 public class PaimonJniScanner extends JniScanner {
     private static final Logger LOG = 
LoggerFactory.getLogger(PaimonJniScanner.class);
     private static final String PAIMON_OPTION_PREFIX = "paimon_option_prefix.";
+    private final Map<String, String> params;
     private final Map<String, String> paimonOptionParams;
     private final String dbName;
     private final String tblName;
@@ -61,6 +61,13 @@ public class PaimonJniScanner extends JniScanner {
 
     public PaimonJniScanner(int batchSize, Map<String, String> params) {
         LOG.debug("params:{}", params);
+        this.params = params;
+        String[] requiredFields = params.get("required_fields").split(",");
+        String[] requiredTypes = params.get("columns_types").split("#");
+        ColumnType[] columnTypes = new ColumnType[requiredTypes.length];
+        for (int i = 0; i < requiredTypes.length; i++) {
+            columnTypes[i] = ColumnType.parseType(requiredFields[i], 
requiredTypes[i]);
+        }
         paimonSplit = params.get("paimon_split");
         paimonPredicate = params.get("paimon_predicate");
         dbName = params.get("db_name");
@@ -69,21 +76,17 @@ public class PaimonJniScanner extends JniScanner {
         dbId = Long.parseLong(params.get("db_id"));
         tblId = Long.parseLong(params.get("tbl_id"));
         lastUpdateTime = Long.parseLong(params.get("last_update_time"));
-        super.batchSize = batchSize;
-        super.fields = params.get("paimon_column_names").split(",");
-        super.predicates = new ScanPredicate[0];
+        initTableInfo(columnTypes, requiredFields, new ScanPredicate[0], 
batchSize);
         paimonOptionParams = params.entrySet().stream()
                 .filter(kv -> kv.getKey().startsWith(PAIMON_OPTION_PREFIX))
                 .collect(Collectors
                         .toMap(kv1 -> 
kv1.getKey().substring(PAIMON_OPTION_PREFIX.length()), kv1 -> kv1.getValue()));
-
     }
 
     @Override
     public void open() throws IOException {
         initTable();
         initReader();
-        parseRequiredTypes();
     }
 
     private void initReader() throws IOException {
@@ -99,30 +102,16 @@ public class PaimonJniScanner extends JniScanner {
 
     private List<Predicate> getPredicates() {
         List<Predicate> predicates = 
PaimonScannerUtils.decodeStringToObject(paimonPredicate);
-        LOG.info("predicates:{}", predicates);
+        LOG.debug("predicates:{}", predicates);
         return predicates;
     }
 
     private Split getSplit() {
         Split split = PaimonScannerUtils.decodeStringToObject(paimonSplit);
-        LOG.info("split:{}", split);
+        LOG.debug("split:{}", split);
         return split;
     }
 
-    private void parseRequiredTypes() {
-        ColumnType[] columnTypes = new ColumnType[fields.length];
-        for (int i = 0; i < fields.length; i++) {
-            int index = paimonAllFieldNames.indexOf(fields[i]);
-            if (index == -1) {
-                throw new RuntimeException(String.format("Cannot find field %s 
in schema %s",
-                        fields[i], paimonAllFieldNames));
-            }
-            DataType dataType = table.rowType().getTypeAt(index);
-            columnTypes[i] = PaimonTypeUtils.fromPaimonType(fields[i], 
dataType);
-        }
-        super.types = columnTypes;
-    }
-
     @Override
     public void close() throws IOException {
         if (reader != null) {
@@ -154,9 +143,12 @@ public class PaimonJniScanner extends JniScanner {
                 recordIterator.releaseBatch();
                 recordIterator = reader.readBatch();
             }
-        } catch (IOException e) {
-            LOG.warn("failed to getNext columnValue ", e);
-            throw new RuntimeException(e);
+        } catch (Exception e) {
+            close();
+            LOG.warn("Failed to get the next batch of paimon. "
+                            + "split: {}, requiredFieldNames: {}, 
paimonAllFieldNames: {}",
+                    getSplit(), params.get("required_fields"), 
paimonAllFieldNames, e);
+            throw new IOException(e);
         }
         return rows;
     }
@@ -178,7 +170,7 @@ public class PaimonJniScanner extends JniScanner {
         }
         this.table = tableExt.getTable();
         paimonAllFieldNames = 
PaimonScannerUtils.fieldNames(this.table.rowType());
-        LOG.info("paimonAllFieldNames:{}", paimonAllFieldNames);
+        LOG.debug("paimonAllFieldNames:{}", paimonAllFieldNames);
     }
 
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to