xinli liang created FLINK-25078:
-----------------------------------
Summary: An error is reported when the flink SQL connects to
Phoenix
Key: FLINK-25078
URL: https://issues.apache.org/jira/browse/FLINK-25078
Project: Flink
Issue Type: Bug
Components: Table SQL / API
Environment: Maven configuration environment is as follows:
{code:java}
//代码占位符
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>iyb-db-account</artifactId>
<groupId>org.iyunbao</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>iyb-realtime</artifactId>
<properties>
<java.version>1.8</java.version>
<maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target>
<flink.version>1.12.0</flink.version>
<scala.version>2.12</scala.version>
<hadoop.version>3.1.3</hadoop.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-hbase-2.2_2.11</artifactId>
<version>1.12.3</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.68</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-hadoop2</artifactId>
<version>1.6.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-jdbc_2.12</artifactId>
<version>1.8.0</version>
</dependency>
<!-- dependency>
<groupId>org.codehaus.janino</groupId>
<artifactId>janino</artifactId>
<version>3.0.8</version>
</dependency -->
<!--如果保存检查点到hdfs上,需要引入此依赖-->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<!--Flink默认使用的是slf4j记录日志,相当于一个日志的接口,我们这里使用log4j作为具体的日志实现-->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-to-slf4j</artifactId>
<version>2.14.0</version>
</dependency>
<!--lomback插件依赖-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.12</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.47</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba.blink</groupId>
<artifactId>flink-jdbc</artifactId>
<version>1.5.1</version>
</dependency>
<dependency>
<groupId>org.apache.phoenix</groupId>
<artifactId>phoenix-core</artifactId>
<version>5.0.0-HBase-2.0</version>
</dependency>
<!-- <dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>2.0.3</version>
</dependency>-->
<!-- <dependency>
<groupId>org.apache.phoenix</groupId>
<artifactId>phoenix-spark</artifactId>
<version>5.0.0-HBase-2.0</version>
<exclusions>
<exclusion>
<groupId>org.glassfish</groupId>
<artifactId>javax.el</artifactId>
</exclusion>
</exclusions>
</dependency>-->
<dependency>
<groupId>commons-beanutils</groupId>
<artifactId>commons-beanutils-core</artifactId>
<version>1.8.0</version>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>3.3.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project> {code}
Reporter: xinli liang
I want to know where the problem is. Thanks
The code is as follows
{code:java}
//代码占位符
package com.iyunbao.app.dim;
import com.iyunbao.common.GmallConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
/**
* @author liangxinli
* @create 2021-11-03 10:33
*/
public class dim_agt_account_cloud_hhbak {
public static void main(String[] args) {
//创建执行环境
StreamExecutionEnvironment senv =
StreamExecutionEnvironment.getExecutionEnvironment();
//todo 创建表环境 bug比较多,建议指定版本
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.inStreamingMode()
.useBlinkPlanner()
.build();
//从流环境转换成表环境
StreamTableEnvironment tableEnv =
StreamTableEnvironment.create(senv,settings);
try {
Class.forName(GmallConfig.PHOENIX_DRIVER);
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
//把mysql 中的表映射成一个输出临时表
tableEnv.executeSql(
"Create Table `t_user` (" +
" `id` varchar ," +
" `open_id` varchar ," +
" `user_name` varchar ," +
" `password` varchar ," +
" `type` tinyint ," +
" `login_time` timestamp ," +
" `extra` varchar ," +
" `target` varchar ," +
" `platform_id` bigint ," +
" `status` tinyint ," +
" `is_bt` varchar ," +
" `settle_type` varchar ," +
" `account_type` varchar ," +
" `is_rate_visable` varchar ," +
" `create_time` timestamp ," +
" `creator` bigint ," +
" `update_time` timestamp ," +
" `updater` bigint ," +
" `valid` varchar ," +
" `mobile` varchar ," +
" `is_relation_role` varchar ," +
" `source` varchar ," +
" `unique_no` varchar ," +
" `is_bind_bt` varchar , " +
"PRIMARY KEY (id) NOT ENFORCED" +
")" +
"with (" +
" 'connector' = 'jdbc'," +
" 'driver' =
'org.apache.phoenix.jdbc.PhoenixDriver',"+
" 'url' = 'jdbc:phoenix://hadoop112:2181'," +
" 'table-name' = 't_user'" +
" )");
//todo 对数据的处理方式
tableEnv.executeSql("select open_id from t_user ").print();
try {
senv.execute();
} catch (Exception e) {
e.printStackTrace();
}
}
}
{code}
The error is as follows:
{code:java}
//代码占位符
Exception in thread "main" org.apache.flink.table.api.ValidationException:
Unable to create a source for reading table
'default_catalog.default_database.t_user'.Table options are:'connector'='jdbc'
'driver'='org.apache.phoenix.jdbc.PhoenixDriver'
'table-name'='t_user'
'url'='jdbc:phoenix://hadoop112:2181'
at
org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:125)
at
org.apache.flink.table.planner.plan.schema.CatalogSourceTable.createDynamicTableSource(CatalogSourceTable.java:265)
at
org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:100)
at
org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3585)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2507)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2144)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2093)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:663)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:570)
at
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:165)
at
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:157)
at
org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:823)
at
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:795)
at
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:250)
at
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:659)
at
com.iyunbao.app.dim.dim_agt_account_cloud_hhbak.main(dim_agt_account_cloud_hhbak.java:159)
Caused by: java.lang.IllegalStateException: Cannot handle such jdbc url:
jdbc:phoenix://hadoop112:2181
at org.apache.flink.util.Preconditions.checkState(Preconditions.java:198)
at
org.apache.flink.connector.jdbc.table.JdbcDynamicTableFactory.validateConfigOptions(JdbcDynamicTableFactory.java:281)
at
org.apache.flink.connector.jdbc.table.JdbcDynamicTableFactory.createDynamicTableSource(JdbcDynamicTableFactory.java:180)
at
org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:122)
... 19 moreProcess finished with exit code 1
{code}
--
This message was sent by Atlassian Jira
(v8.20.1#820001)