This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 2708ee43540 [fix](paimon) use SlotDescriptor to parse the required
fields #28990 (#29053)
2708ee43540 is described below
commit 2708ee4354035a7d67f082234d37960a7b235bcd
Author: Ashin Gau <[email protected]>
AuthorDate: Wed Dec 27 17:44:14 2023 +0800
[fix](paimon) use SlotDescriptor to parse the required fields #28990
(#29053)
backport: #28990
---
be/src/vec/exec/format/table/paimon_reader.cpp | 7 +++-
.../apache/doris/common/jni/vec/VectorColumn.java | 9 +++++
.../org/apache/doris/paimon/PaimonJniScanner.java | 44 +++++++++-------------
3 files changed, 32 insertions(+), 28 deletions(-)
diff --git a/be/src/vec/exec/format/table/paimon_reader.cpp
b/be/src/vec/exec/format/table/paimon_reader.cpp
index f1ebb96fa10..3a2d3745047 100644
--- a/be/src/vec/exec/format/table/paimon_reader.cpp
+++ b/be/src/vec/exec/format/table/paimon_reader.cpp
@@ -42,9 +42,10 @@ PaimonJniReader::PaimonJniReader(const
std::vector<SlotDescriptor*>& file_slot_d
const TFileRangeDesc& range)
: _file_slot_descs(file_slot_descs), _state(state), _profile(profile) {
std::vector<std::string> column_names;
+ std::vector<std::string> column_types;
for (auto& desc : _file_slot_descs) {
- std::string field = desc->col_name();
- column_names.emplace_back(field);
+ column_names.emplace_back(desc->col_name());
+ column_types.emplace_back(JniConnector::get_hive_type(desc->type()));
}
std::map<String, String> params;
params["db_name"] = range.table_format_params.paimon_params.db_name;
@@ -57,6 +58,8 @@ PaimonJniReader::PaimonJniReader(const
std::vector<SlotDescriptor*>& file_slot_d
params["tbl_id"] =
std::to_string(range.table_format_params.paimon_params.tbl_id);
params["last_update_time"] =
std::to_string(range.table_format_params.paimon_params.last_update_time);
+ params["required_fields"] = join(column_names, ",");
+ params["columns_types"] = join(column_types, "#");
// Used to create paimon option
for (auto& kv : range.table_format_params.paimon_params.paimon_options) {
diff --git
a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/vec/VectorColumn.java
b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/vec/VectorColumn.java
index 381422931d4..f79951b4570 100644
---
a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/vec/VectorColumn.java
+++
b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/vec/VectorColumn.java
@@ -563,6 +563,9 @@ public class VectorColumn {
public int appendBytesAndOffset(byte[] src, int offset, int length) {
int startOffset = childColumns[0].appendBytes(src, offset, length);
reserve(appendIndex + 1);
+ if (startOffset + length < 0) {
+ throw new RuntimeException("String overflow, offset=" +
startOffset + ", length=" + length);
+ }
OffHeap.putInt(null, offsets + 4L * appendIndex, startOffset + length);
return appendIndex++;
}
@@ -591,6 +594,9 @@ public class VectorColumn {
childColumns[0].appendValue(v);
}
reserve(appendIndex + 1);
+ if (startOffset + length < 0) {
+ throw new RuntimeException("Array overflow, offset=" + startOffset
+ ", length=" + length);
+ }
OffHeap.putLong(null, offsets + 8L * appendIndex, startOffset +
length);
return appendIndex++;
}
@@ -605,6 +611,9 @@ public class VectorColumn {
childColumns[1].appendValue(v);
}
reserve(appendIndex + 1);
+ if (startOffset + length < 0) {
+ throw new RuntimeException("Map overflow, offset=" + startOffset +
", length=" + length);
+ }
OffHeap.putLong(null, offsets + 8L * appendIndex, startOffset +
length);
return appendIndex++;
}
diff --git
a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java
b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java
index 849f6fb67bb..be1505c4ef1 100644
---
a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java
+++
b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java
@@ -30,7 +30,6 @@ import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.Split;
-import org.apache.paimon.types.DataType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,6 +42,7 @@ import java.util.stream.Collectors;
public class PaimonJniScanner extends JniScanner {
private static final Logger LOG =
LoggerFactory.getLogger(PaimonJniScanner.class);
private static final String PAIMON_OPTION_PREFIX = "paimon_option_prefix.";
+ private final Map<String, String> params;
private final Map<String, String> paimonOptionParams;
private final String dbName;
private final String tblName;
@@ -61,6 +61,13 @@ public class PaimonJniScanner extends JniScanner {
public PaimonJniScanner(int batchSize, Map<String, String> params) {
LOG.debug("params:{}", params);
+ this.params = params;
+ String[] requiredFields = params.get("required_fields").split(",");
+ String[] requiredTypes = params.get("columns_types").split("#");
+ ColumnType[] columnTypes = new ColumnType[requiredTypes.length];
+ for (int i = 0; i < requiredTypes.length; i++) {
+ columnTypes[i] = ColumnType.parseType(requiredFields[i],
requiredTypes[i]);
+ }
paimonSplit = params.get("paimon_split");
paimonPredicate = params.get("paimon_predicate");
dbName = params.get("db_name");
@@ -69,21 +76,17 @@ public class PaimonJniScanner extends JniScanner {
dbId = Long.parseLong(params.get("db_id"));
tblId = Long.parseLong(params.get("tbl_id"));
lastUpdateTime = Long.parseLong(params.get("last_update_time"));
- super.batchSize = batchSize;
- super.fields = params.get("paimon_column_names").split(",");
- super.predicates = new ScanPredicate[0];
+ initTableInfo(columnTypes, requiredFields, new ScanPredicate[0],
batchSize);
paimonOptionParams = params.entrySet().stream()
.filter(kv -> kv.getKey().startsWith(PAIMON_OPTION_PREFIX))
.collect(Collectors
.toMap(kv1 ->
kv1.getKey().substring(PAIMON_OPTION_PREFIX.length()), kv1 -> kv1.getValue()));
-
}
@Override
public void open() throws IOException {
initTable();
initReader();
- parseRequiredTypes();
}
private void initReader() throws IOException {
@@ -99,30 +102,16 @@ public class PaimonJniScanner extends JniScanner {
private List<Predicate> getPredicates() {
List<Predicate> predicates =
PaimonScannerUtils.decodeStringToObject(paimonPredicate);
- LOG.info("predicates:{}", predicates);
+ LOG.debug("predicates:{}", predicates);
return predicates;
}
private Split getSplit() {
Split split = PaimonScannerUtils.decodeStringToObject(paimonSplit);
- LOG.info("split:{}", split);
+ LOG.debug("split:{}", split);
return split;
}
- private void parseRequiredTypes() {
- ColumnType[] columnTypes = new ColumnType[fields.length];
- for (int i = 0; i < fields.length; i++) {
- int index = paimonAllFieldNames.indexOf(fields[i]);
- if (index == -1) {
- throw new RuntimeException(String.format("Cannot find field %s
in schema %s",
- fields[i], paimonAllFieldNames));
- }
- DataType dataType = table.rowType().getTypeAt(index);
- columnTypes[i] = PaimonTypeUtils.fromPaimonType(fields[i],
dataType);
- }
- super.types = columnTypes;
- }
-
@Override
public void close() throws IOException {
if (reader != null) {
@@ -154,9 +143,12 @@ public class PaimonJniScanner extends JniScanner {
recordIterator.releaseBatch();
recordIterator = reader.readBatch();
}
- } catch (IOException e) {
- LOG.warn("failed to getNext columnValue ", e);
- throw new RuntimeException(e);
+ } catch (Exception e) {
+ close();
+ LOG.warn("Failed to get the next batch of paimon. "
+ + "split: {}, requiredFieldNames: {},
paimonAllFieldNames: {}",
+ getSplit(), params.get("required_fields"),
paimonAllFieldNames, e);
+ throw new IOException(e);
}
return rows;
}
@@ -178,7 +170,7 @@ public class PaimonJniScanner extends JniScanner {
}
this.table = tableExt.getTable();
paimonAllFieldNames =
PaimonScannerUtils.fieldNames(this.table.rowType());
- LOG.info("paimonAllFieldNames:{}", paimonAllFieldNames);
+ LOG.debug("paimonAllFieldNames:{}", paimonAllFieldNames);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]