This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 8c9fcf0a630 [fix](jni) remove 'push_down_predicates' and fix BE crash
with decimal predicate (#32253) (#32599)
8c9fcf0a630 is described below
commit 8c9fcf0a6309fa993e9e235dd6709c83872b895e
Author: Mingyu Chen <[email protected]>
AuthorDate: Thu Mar 21 13:48:51 2024 +0800
[fix](jni) remove 'push_down_predicates' and fix BE crash with decimal
predicate (#32253) (#32599)
---
be/src/vec/exec/jni_connector.cpp | 17 ++++++++++-------
be/src/vec/exec/jni_connector.h | 3 +++
.../main/java/org/apache/doris/avro/AvroJNIScanner.java | 3 +--
.../main/java/org/apache/doris/hudi/HudiJniScanner.java | 16 +---------------
.../java/org/apache/doris/common/jni/JniScanner.java | 12 +++++++++---
.../org/apache/doris/common/jni/MockJniScanner.java | 11 +----------
.../apache/doris/maxcompute/MaxComputeJniScanner.java | 16 +++-------------
.../java/org/apache/doris/paimon/PaimonJniScanner.java | 3 +--
8 files changed, 29 insertions(+), 52 deletions(-)
diff --git a/be/src/vec/exec/jni_connector.cpp
b/be/src/vec/exec/jni_connector.cpp
index 00c0dcd8c7b..b802ff2431c 100644
--- a/be/src/vec/exec/jni_connector.cpp
+++ b/be/src/vec/exec/jni_connector.cpp
@@ -102,13 +102,16 @@ Status JniConnector::open(RuntimeState* state,
RuntimeProfile* profile) {
Status JniConnector::init(
std::unordered_map<std::string, ColumnValueRangeType>*
colname_to_value_range) {
- _generate_predicates(colname_to_value_range);
- if (_predicates_length != 0 && _predicates != nullptr) {
- int64_t predicates_address = (int64_t)_predicates.get();
- // We can call
org.apache.doris.common.jni.vec.ScanPredicate#parseScanPredicates to parse the
- // serialized predicates in java side.
- _scanner_params.emplace("push_down_predicates",
std::to_string(predicates_address));
- }
+ // TODO: This logic need to be changed.
+ // See the comment of "predicates" field in JniScanner.java
+
+ // _generate_predicates(colname_to_value_range);
+ // if (_predicates_length != 0 && _predicates != nullptr) {
+ // int64_t predicates_address = (int64_t)_predicates.get();
+ // // We can call
org.apache.doris.common.jni.vec.ScanPredicate#parseScanPredicates to parse the
+ // // serialized predicates in java side.
+ // _scanner_params.emplace("push_down_predicates",
std::to_string(predicates_address));
+ // }
return Status::OK();
}
diff --git a/be/src/vec/exec/jni_connector.h b/be/src/vec/exec/jni_connector.h
index ae3711f663a..22e33f01053 100644
--- a/be/src/vec/exec/jni_connector.h
+++ b/be/src/vec/exec/jni_connector.h
@@ -164,6 +164,9 @@ public:
char_ptr += s->size;
}
} else {
+ // FIXME: it can not handle decimal type correctly.
+ // but this logic is deprecated and not used.
+ // so may be deleted or fixed later.
for (const CppType* v : values) {
int type_len = sizeof(CppType);
*reinterpret_cast<int*>(char_ptr) = type_len;
diff --git
a/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroJNIScanner.java
b/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroJNIScanner.java
index 17a185d03ae..dc845f43cb8 100644
---
a/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroJNIScanner.java
+++
b/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroJNIScanner.java
@@ -19,7 +19,6 @@ package org.apache.doris.avro;
import org.apache.doris.common.jni.JniScanner;
import org.apache.doris.common.jni.vec.ColumnType;
-import org.apache.doris.common.jni.vec.ScanPredicate;
import org.apache.doris.common.jni.vec.TableSchema;
import org.apache.doris.thrift.TFileType;
@@ -173,7 +172,7 @@ public class AvroJNIScanner extends JniScanner {
try {
initAvroFileContext();
initFieldInspector();
- initTableInfo(requiredTypes, requiredFields, new ScanPredicate[0],
fetchSize);
+ initTableInfo(requiredTypes, requiredFields, fetchSize);
} catch (Exception e) {
LOG.warn("Failed to init avro scanner. ", e);
throw new RuntimeException(e);
diff --git
a/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiJniScanner.java
b/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiJniScanner.java
index 932b53b9a7c..0a1b69fcfb0 100644
---
a/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiJniScanner.java
+++
b/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiJniScanner.java
@@ -20,7 +20,6 @@ package org.apache.doris.hudi;
import org.apache.doris.common.jni.JniScanner;
import org.apache.doris.common.jni.vec.ColumnType;
-import org.apache.doris.common.jni.vec.ScanPredicate;
import org.apache.doris.common.security.authentication.AuthenticationConfig;
import org.apache.doris.common.security.authentication.HadoopUGI;
@@ -59,7 +58,6 @@ public class HudiJniScanner extends JniScanner {
private final int fetchSize;
private final String debugString;
private final HoodieSplit split;
- private final ScanPredicate[] predicates;
private final ClassLoader classLoader;
private long getRecordReaderTimeNs = 0;
@@ -123,20 +121,8 @@ public class HudiJniScanner extends JniScanner {
.collect(Collectors.joining("\n"));
try {
this.classLoader = this.getClass().getClassLoader();
- String predicatesAddressString =
params.remove("push_down_predicates");
this.fetchSize = fetchSize;
this.split = new HoodieSplit(params);
- if (predicatesAddressString == null) {
- predicates = new ScanPredicate[0];
- } else {
- long predicatesAddress =
Long.parseLong(predicatesAddressString);
- if (predicatesAddress != 0) {
- predicates =
ScanPredicate.parseScanPredicates(predicatesAddress, split.requiredTypes());
- LOG.info("HudiJniScanner gets pushed-down predicates: " +
ScanPredicate.dump(predicates));
- } else {
- predicates = new ScanPredicate[0];
- }
- }
} catch (Exception e) {
LOG.error("Failed to initialize hudi scanner, split params:\n" +
debugString, e);
throw e;
@@ -147,7 +133,7 @@ public class HudiJniScanner extends JniScanner {
public void open() throws IOException {
Future<?> avroFuture = avroReadPool.submit(() -> {
Thread.currentThread().setContextClassLoader(classLoader);
- initTableInfo(split.requiredTypes(), split.requiredFields(),
predicates, fetchSize);
+ initTableInfo(split.requiredTypes(), split.requiredFields(),
fetchSize);
long startTime = System.nanoTime();
// RecordReader will use ProcessBuilder to start a hotspot
process, which may be stuck,
// so use another process to kill this stuck process.
diff --git
a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/JniScanner.java
b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/JniScanner.java
index 51ed837813c..cb191f4b038 100644
---
a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/JniScanner.java
+++
b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/JniScanner.java
@@ -33,6 +33,14 @@ public abstract class JniScanner {
protected VectorTable vectorTable;
protected String[] fields;
protected ColumnType[] types;
+ @Deprecated
+ // This predicate is from BE, but no used.
+ // TODO: actually, we can generate the predicate for JNI scanner in FE's
planner,
+ // then serialize it to BE, and BE pass it to JNI scanner directly.
+ // NO need to use this intermediate expression, because each JNI scanner
has its
+ // own predicate expression format.
+ // For example, Paimon use
"PaimonScannerUtils.decodeStringToObject(paimonPredicate)"
+ // to deserialize the predicate string to PaimonPredicate object.
protected ScanPredicate[] predicates;
protected int batchSize;
@@ -50,11 +58,9 @@ public abstract class JniScanner {
throw new UnsupportedOperationException();
}
- protected void initTableInfo(ColumnType[] requiredTypes, String[]
requiredFields, ScanPredicate[] predicates,
- int batchSize) {
+ protected void initTableInfo(ColumnType[] requiredTypes, String[]
requiredFields, int batchSize) {
this.types = requiredTypes;
this.fields = requiredFields;
- this.predicates = predicates;
this.batchSize = batchSize;
}
diff --git
a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/MockJniScanner.java
b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/MockJniScanner.java
index bc7561e2a23..000f536e915 100644
---
a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/MockJniScanner.java
+++
b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/MockJniScanner.java
@@ -20,7 +20,6 @@ package org.apache.doris.common.jni;
import org.apache.doris.common.jni.vec.ColumnType;
import org.apache.doris.common.jni.vec.ColumnValue;
-import org.apache.doris.common.jni.vec.ScanPredicate;
import org.apache.log4j.Logger;
@@ -187,15 +186,7 @@ public class MockJniScanner extends JniScanner {
for (int i = 0; i < types.length; i++) {
columnTypes[i] = ColumnType.parseType(requiredFields[i], types[i]);
}
- ScanPredicate[] predicates = new ScanPredicate[0];
- if (params.containsKey("push_down_predicates")) {
- long predicatesAddress =
Long.parseLong(params.get("push_down_predicates"));
- if (predicatesAddress != 0) {
- predicates =
ScanPredicate.parseScanPredicates(predicatesAddress, columnTypes);
- LOG.info("MockJniScanner gets pushed-down predicates: " +
ScanPredicate.dump(predicates));
- }
- }
- initTableInfo(columnTypes, requiredFields, predicates, batchSize);
+ initTableInfo(columnTypes, requiredFields, batchSize);
}
@Override
diff --git
a/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java
b/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java
index a87446b14b7..6a441a69293 100644
---
a/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java
+++
b/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java
@@ -19,7 +19,6 @@ package org.apache.doris.maxcompute;
import org.apache.doris.common.jni.JniScanner;
import org.apache.doris.common.jni.vec.ColumnType;
-import org.apache.doris.common.jni.vec.ScanPredicate;
import com.aliyun.odps.Column;
import com.aliyun.odps.OdpsType;
@@ -99,15 +98,7 @@ public class MaxComputeJniScanner extends JniScanner {
for (int i = 0; i < types.length; i++) {
columnTypes[i] = ColumnType.parseType(requiredFields[i], types[i]);
}
- ScanPredicate[] predicates = new ScanPredicate[0];
- if (params.containsKey("push_down_predicates")) {
- long predicatesAddress =
Long.parseLong(params.get("push_down_predicates"));
- if (predicatesAddress != 0) {
- predicates =
ScanPredicate.parseScanPredicates(predicatesAddress, columnTypes);
- LOG.info("MaxComputeJniScanner gets pushed-down predicates: "
+ ScanPredicate.dump(predicates));
- }
- }
- initTableInfo(columnTypes, requiredFields, predicates, batchSize);
+ initTableInfo(columnTypes, requiredFields, batchSize);
}
public void refreshTableScan() {
@@ -133,9 +124,8 @@ public class MaxComputeJniScanner extends JniScanner {
}
@Override
- protected void initTableInfo(ColumnType[] requiredTypes, String[]
requiredFields, ScanPredicate[] predicates,
- int batchSize) {
- super.initTableInfo(requiredTypes, requiredFields, predicates,
batchSize);
+ protected void initTableInfo(ColumnType[] requiredTypes, String[]
requiredFields, int batchSize) {
+ super.initTableInfo(requiredTypes, requiredFields, batchSize);
readColumns = new ArrayList<>();
readColumnsToId = new HashMap<>();
for (int i = 0; i < fields.length; i++) {
diff --git
a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java
b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java
index 69ec49e3364..ad45c729190 100644
---
a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java
+++
b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java
@@ -19,7 +19,6 @@ package org.apache.doris.paimon;
import org.apache.doris.common.jni.JniScanner;
import org.apache.doris.common.jni.vec.ColumnType;
-import org.apache.doris.common.jni.vec.ScanPredicate;
import org.apache.doris.common.jni.vec.TableSchema;
import org.apache.doris.paimon.PaimonTableCache.PaimonTableCacheKey;
import org.apache.doris.paimon.PaimonTableCache.TableExt;
@@ -82,7 +81,7 @@ public class PaimonJniScanner extends JniScanner {
dbId = Long.parseLong(params.get("db_id"));
tblId = Long.parseLong(params.get("tbl_id"));
lastUpdateTime = Long.parseLong(params.get("last_update_time"));
- initTableInfo(columnTypes, requiredFields, new ScanPredicate[0],
batchSize);
+ initTableInfo(columnTypes, requiredFields, batchSize);
paimonOptionParams = params.entrySet().stream()
.filter(kv -> kv.getKey().startsWith(PAIMON_OPTION_PREFIX))
.collect(Collectors
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]