lincoln-lil commented on code in PR #22166:
URL: https://github.com/apache/flink/pull/22166#discussion_r1141662148
##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala:
##########
@@ -169,17 +171,46 @@ abstract class PlannerBase(
override def getParser: Parser = {
if (parser == null || getTableConfig.getSqlDialect != currentDialect) {
dialectFactory = getDialectFactory
- parser =
- dialectFactory.create(new DefaultParserContext(catalogManager,
plannerContext, executor))
+ parser = dialectFactory.create(
+ new DefaultCalciteContext(catalogManager, getOperationTreeBuilder,
plannerContext))
}
parser
}
+ def getOperationTreeBuilder: OperationTreeBuilder = {
Review Comment:
this can be 'private'
##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala:
##########
@@ -169,17 +171,46 @@ abstract class PlannerBase(
override def getParser: Parser = {
if (parser == null || getTableConfig.getSqlDialect != currentDialect) {
dialectFactory = getDialectFactory
- parser =
- dialectFactory.create(new DefaultParserContext(catalogManager,
plannerContext, executor))
+ parser = dialectFactory.create(
+ new DefaultCalciteContext(catalogManager, getOperationTreeBuilder,
plannerContext))
}
parser
}
+ def getOperationTreeBuilder: OperationTreeBuilder = {
+ OperationTreeBuilderImpl.create(
+ tableConfig,
+ classLoader,
+ functionCatalog.asLookup(f => getParser.parseIdentifier(f)),
+ catalogManager.getDataTypeFactory,
+ (path: String) => getTableReferenceExpression(path),
+ (s: String, inputRowType: RowType, outputType) =>
+ getParser.parseSqlExpression(s, inputRowType, outputType),
+ isStreamingMode
+ )
+ }
+
+ def getTableReferenceExpression(path: String):
Optional[TableReferenceExpression] = {
+ try {
+ val unresolvedIdentifier = getParser.parseIdentifier(path)
+ val tableIdentifier =
catalogManager.qualifyIdentifier(unresolvedIdentifier)
+ val optionalTable = catalogManager.getTable(tableIdentifier)
+ if (!optionalTable.isPresent) {
+ Optional.empty()
+ } else {
+ val queryOperation = new SourceQueryOperation(optionalTable.get())
+ Optional.of(ApiExpressionUtils.tableRef(path, queryOperation))
+ }
+ } catch {
+ case _: SqlParserException => Optional.empty()
Review Comment:
Why is the exception here not thrown directly? Are there any test cases
related to this exception path?
##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/result/ResultFetcher.java:
##########
@@ -134,9 +138,22 @@ private ResultFetcher(
}
public static ResultFetcher fromTableResult(
- OperationHandle operationHandle,
- TableResultInternal tableResult,
- boolean isQueryResult) {
+ OperationHandle operationHandle, TableResult tableResult, boolean
isQueryResult) {
+ CloseableIterator<RowData> resultRows;
+ RowDataToStringConverter rowDataToStringConverter;
+ if (tableResult instanceof TableResultInternal) {
+ TableResultInternal tableResultInternal = (TableResultInternal)
tableResult;
+ resultRows = tableResultInternal.collectInternal();
+ rowDataToStringConverter =
tableResultInternal.getRowDataToStringConverter();
+ } else {
+ // sometime, the tableResult maybe not an instance of
TableResultInternal
Review Comment:
nit: 'sometimes'
##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala:
##########
@@ -169,17 +171,46 @@ abstract class PlannerBase(
override def getParser: Parser = {
if (parser == null || getTableConfig.getSqlDialect != currentDialect) {
dialectFactory = getDialectFactory
- parser =
- dialectFactory.create(new DefaultParserContext(catalogManager,
plannerContext, executor))
+ parser = dialectFactory.create(
+ new DefaultCalciteContext(catalogManager, getOperationTreeBuilder,
plannerContext))
}
parser
}
+ def getOperationTreeBuilder: OperationTreeBuilder = {
+ OperationTreeBuilderImpl.create(
+ tableConfig,
+ classLoader,
+ functionCatalog.asLookup(f => getParser.parseIdentifier(f)),
+ catalogManager.getDataTypeFactory,
+ (path: String) => getTableReferenceExpression(path),
+ (s: String, inputRowType: RowType, outputType) =>
+ getParser.parseSqlExpression(s, inputRowType, outputType),
+ isStreamingMode
+ )
+ }
+
+ def getTableReferenceExpression(path: String):
Optional[TableReferenceExpression] = {
Review Comment:
ditto
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]