This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new b09f6d5cf [minor] Adjust FlinkCalciteClasses
b09f6d5cf is described below
commit b09f6d5cf9d7420f0083a2ac26c0d63cf35f09bc
Author: Jingsong <[email protected]>
AuthorDate: Mon Jun 24 16:22:57 2024 +0800
[minor] Adjust FlinkCalciteClasses
---
.../predicate/SimpleSqlPredicateConvertor.java | 85 +++++++++++-----------
...alciteModule3.java => FlinkCalciteClasses.java} | 31 ++++----
2 files changed, 58 insertions(+), 58 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/predicate/SimpleSqlPredicateConvertor.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/predicate/SimpleSqlPredicateConvertor.java
index 02c98f1cd..dcdcc31a9 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/predicate/SimpleSqlPredicateConvertor.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/predicate/SimpleSqlPredicateConvertor.java
@@ -18,7 +18,7 @@
package org.apache.paimon.flink.predicate;
-import org.apache.paimon.flink.utils.CalciteModule3;
+import org.apache.paimon.flink.utils.FlinkCalciteClasses;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.types.DataType;
@@ -31,82 +31,83 @@ import java.util.function.BiFunction;
/** convert sql to predicate. */
public class SimpleSqlPredicateConvertor {
+
private final PredicateBuilder builder;
private final RowType rowType;
- private CalciteModule3 calciteModule3;
+ private final FlinkCalciteClasses calciteClasses;
public SimpleSqlPredicateConvertor(RowType type) throws Exception {
this.rowType = type;
this.builder = new PredicateBuilder(type);
- calciteModule3 = new CalciteModule3();
+ this.calciteClasses = new FlinkCalciteClasses();
}
public Predicate convertSqlToPredicate(String whereSql) throws Exception {
Object config =
- calciteModule3
+ calciteClasses
.configDelegate()
.withLex(
- calciteModule3.sqlParserDelegate().config(),
- calciteModule3.lexDelegate().java());
- Object sqlParser = calciteModule3.sqlParserDelegate().create(whereSql,
config);
- Object sqlBasicCall =
calciteModule3.sqlParserDelegate().parseExpression(sqlParser);
+ calciteClasses.sqlParserDelegate().config(),
+ calciteClasses.lexDelegate().java());
+ Object sqlParser = calciteClasses.sqlParserDelegate().create(whereSql,
config);
+ Object sqlBasicCall =
calciteClasses.sqlParserDelegate().parseExpression(sqlParser);
return convert(sqlBasicCall);
}
public Predicate convert(Object sqlBasicCall) throws Exception {
- Object operator =
calciteModule3.sqlBasicCallDelegate().getOperator(sqlBasicCall);
- Object kind = calciteModule3.sqlOperatorDelegate().getKind(operator);
+ Object operator =
calciteClasses.sqlBasicCallDelegate().getOperator(sqlBasicCall);
+ Object kind = calciteClasses.sqlOperatorDelegate().getKind(operator);
- if
(calciteModule3.sqlOperatorDelegate().instanceOfSqlBinaryOperator(operator)) {
+ if
(calciteClasses.sqlOperatorDelegate().instanceOfSqlBinaryOperator(operator)) {
List<?> operandList =
-
calciteModule3.sqlBasicCallDelegate().getOperandList(sqlBasicCall);
+
calciteClasses.sqlBasicCallDelegate().getOperandList(sqlBasicCall);
Object left = operandList.get(0);
Object right = operandList.get(1);
- if (kind == calciteModule3.sqlKindDelegate().or()) {
+ if (kind == calciteClasses.sqlKindDelegate().or()) {
return PredicateBuilder.or(convert(left), convert(right));
- } else if (kind == calciteModule3.sqlKindDelegate().and()) {
+ } else if (kind == calciteClasses.sqlKindDelegate().and()) {
return PredicateBuilder.and(convert(left), convert(right));
- } else if (kind == calciteModule3.sqlKindDelegate().equals()) {
+ } else if (kind == calciteClasses.sqlKindDelegate().equals()) {
return visitBiFunction(left, right, builder::equal,
builder::equal);
- } else if (kind == calciteModule3.sqlKindDelegate().notEquals()) {
+ } else if (kind == calciteClasses.sqlKindDelegate().notEquals()) {
return visitBiFunction(left, right, builder::notEqual,
builder::notEqual);
- } else if (kind == calciteModule3.sqlKindDelegate().lessThan()) {
+ } else if (kind == calciteClasses.sqlKindDelegate().lessThan()) {
return visitBiFunction(left, right, builder::lessThan,
builder::greaterThan);
- } else if (kind ==
calciteModule3.sqlKindDelegate().lessThanOrEqual()) {
+ } else if (kind ==
calciteClasses.sqlKindDelegate().lessThanOrEqual()) {
return visitBiFunction(left, right, builder::lessOrEqual,
builder::greaterOrEqual);
- } else if (kind == calciteModule3.sqlKindDelegate().greaterThan())
{
+ } else if (kind == calciteClasses.sqlKindDelegate().greaterThan())
{
return visitBiFunction(left, right, builder::greaterThan,
builder::lessThan);
- } else if (kind ==
calciteModule3.sqlKindDelegate().greaterThanOrEqual()) {
+ } else if (kind ==
calciteClasses.sqlKindDelegate().greaterThanOrEqual()) {
return visitBiFunction(left, right, builder::greaterOrEqual,
builder::lessOrEqual);
- } else if (kind == calciteModule3.sqlKindDelegate().in()) {
- int index = getfieldIndex(left.toString());
- List<?> elementslist =
calciteModule3.sqlNodeListDelegate().getList(right);
+ } else if (kind == calciteClasses.sqlKindDelegate().in()) {
+ int index = getFieldIndex(left.toString());
+ List<?> elementslist =
calciteClasses.sqlNodeListDelegate().getList(right);
List<Object> list = new ArrayList<>();
for (Object sqlNode : elementslist) {
Object literal =
TypeUtils.castFromString(
-
calciteModule3.sqlLiteralDelegate().toValue(sqlNode),
+
calciteClasses.sqlLiteralDelegate().toValue(sqlNode),
rowType.getFieldTypes().get(index));
list.add(literal);
}
return builder.in(index, list);
}
- } else if
(calciteModule3.sqlOperatorDelegate().instanceOfSqlPostfixOperator(operator)) {
+ } else if
(calciteClasses.sqlOperatorDelegate().instanceOfSqlPostfixOperator(operator)) {
Object child =
-
calciteModule3.sqlBasicCallDelegate().getOperandList(sqlBasicCall).get(0);
- if (kind == calciteModule3.sqlKindDelegate().isNull()) {
+
calciteClasses.sqlBasicCallDelegate().getOperandList(sqlBasicCall).get(0);
+ if (kind == calciteClasses.sqlKindDelegate().isNull()) {
String field = String.valueOf(child);
- return builder.isNull(getfieldIndex(field));
- } else if (kind == calciteModule3.sqlKindDelegate().isNotNull()) {
+ return builder.isNull(getFieldIndex(field));
+ } else if (kind == calciteClasses.sqlKindDelegate().isNotNull()) {
String field = String.valueOf(child);
- return builder.isNotNull(getfieldIndex(field));
+ return builder.isNotNull(getFieldIndex(field));
}
- } else if
(calciteModule3.sqlOperatorDelegate().instanceOfSqlPrefixOperator(operator)) {
- if (kind == calciteModule3.sqlKindDelegate().not()) {
+ } else if
(calciteClasses.sqlOperatorDelegate().instanceOfSqlPrefixOperator(operator)) {
+ if (kind == calciteClasses.sqlKindDelegate().not()) {
return convert(
- calciteModule3
+ calciteClasses
.sqlBasicCallDelegate()
.getOperandList(sqlBasicCall)
.get(0))
@@ -124,19 +125,19 @@ public class SimpleSqlPredicateConvertor {
BiFunction<Integer, Object, Predicate> visitLeft,
BiFunction<Integer, Object, Predicate> visitRight)
throws Exception {
- if
(calciteModule3.sqlIndentifierDelegate().instanceOfSqlIdentifier(left)
- &&
calciteModule3.sqlLiteralDelegate().instanceOfSqlLiteral(right)) {
- int index = getfieldIndex(String.valueOf(left));
- String value = calciteModule3.sqlLiteralDelegate().toValue(right);
+ if
(calciteClasses.sqlIndentifierDelegate().instanceOfSqlIdentifier(left)
+ &&
calciteClasses.sqlLiteralDelegate().instanceOfSqlLiteral(right)) {
+ int index = getFieldIndex(String.valueOf(left));
+ String value = calciteClasses.sqlLiteralDelegate().toValue(right);
DataType type = rowType.getFieldTypes().get(index);
return visitLeft.apply(index, TypeUtils.castFromString(value,
type));
- } else if
(calciteModule3.sqlIndentifierDelegate().instanceOfSqlIdentifier(right)
- &&
calciteModule3.sqlLiteralDelegate().instanceOfSqlLiteral(left)) {
- int index = getfieldIndex(right.toString());
+ } else if
(calciteClasses.sqlIndentifierDelegate().instanceOfSqlIdentifier(right)
+ &&
calciteClasses.sqlLiteralDelegate().instanceOfSqlLiteral(left)) {
+ int index = getFieldIndex(right.toString());
return visitRight.apply(
index,
TypeUtils.castFromString(
- calciteModule3.sqlLiteralDelegate().toValue(left),
+ calciteClasses.sqlLiteralDelegate().toValue(left),
rowType.getFieldTypes().get(index)));
}
@@ -144,7 +145,7 @@ public class SimpleSqlPredicateConvertor {
String.format("%s or %s not been supported.", left, right));
}
- public int getfieldIndex(String field) {
+ public int getFieldIndex(String field) {
int index = builder.indexOf(field);
if (index == -1) {
throw new RuntimeException(String.format("Field `%s` not found",
field));
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/CalciteModule3.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/FlinkCalciteClasses.java
similarity index 95%
rename from
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/CalciteModule3.java
rename to
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/FlinkCalciteClasses.java
index 70be74269..fd0e7c12f 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/CalciteModule3.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/FlinkCalciteClasses.java
@@ -25,16 +25,23 @@ import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.List;
-/** Class for load calcite dependency via reflection. */
-public class CalciteModule3 {
- private static final Logger LOGGER =
LoggerFactory.getLogger(CalciteModule3.class);
+/**
+ * Class for load calcite dependency via reflection. This is because Flink
hides Calcite related
+ * dependencies in a runtime classloader. What this class needs to do is
extract the Calcite class
+ * from this special classloader, but it cannot explicitly rely on them and
can only be called
+ * through reflection.
+ */
+public class FlinkCalciteClasses {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(FlinkCalciteClasses.class);
+
private static final String Flink_PLANNER_MODULE_CLASS =
"org.apache.flink.table.planner.loader.PlannerModule";
private static final String PLANNER_MODULE_METHOD = "getInstance";
-
private static final String SUBMODULE_CLASS_LOADER =
"submoduleClassLoader";
private static final ClassLoader submoduleClassLoader;
+
private final SqlNodeListDelegate sqlNodeListDelegate;
private final SqlLiteralDelegate sqlLiteralDelegate;
private final SqlBasicCallDelegate sqlBasicCallDelegate;
@@ -49,9 +56,10 @@ public class CalciteModule3 {
boolean calciteFound = false;
ClassLoader currentClassLoader =
Thread.currentThread().getContextClassLoader();
try {
+ // this code path is just for testing
currentClassLoader.loadClass(SqlParserDelegate.CLASS_NAME);
calciteFound = true;
- } catch (ClassNotFoundException e) {
+ } catch (ClassNotFoundException ignored) {
}
try {
@@ -61,12 +69,12 @@ public class CalciteModule3 {
submoduleClassLoader = initCalciteClassLoader();
}
} catch (Exception e) {
- LOGGER.error(String.format("Load Calcite class Fail: %s",
e.getMessage()), e);
+ LOG.error(String.format("Load Calcite class Fail: %s",
e.getMessage()), e);
throw new RuntimeException(e);
}
}
- public CalciteModule3() throws ClassNotFoundException {
+ public FlinkCalciteClasses() throws ClassNotFoundException {
sqlNodeListDelegate = new SqlNodeListDelegate();
sqlLiteralDelegate = new SqlLiteralDelegate();
sqlBasicCallDelegate = new SqlBasicCallDelegate();
@@ -204,15 +212,6 @@ public class CalciteModule3 {
/** Accessing org.apache.calcite.sql.parser.SqlParser$Config by
Reflection. */
public static class ConfigDelegate {
static final String CLASS_NAME =
"org.apache.calcite.sql.parser.SqlParser$Config";
- private final Class<?> clazz;
-
- public ConfigDelegate() throws ClassNotFoundException {
- this.clazz = loadCalciteClass(CLASS_NAME);
- }
-
- public Class<?> getClazz() {
- return clazz;
- }
public Object withLex(Object config, Object lex) throws Exception {
return invokeMethod(