[
https://issues.apache.org/jira/browse/FLINK-25078?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17450276#comment-17450276
]
Martijn Visser commented on FLINK-25078:
----------------------------------------
[~liangxinli] Thanks for opening the ticket. Per
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/jdbc/,
Flink currently supports MySQL, Postgres and Derby as JDBC driver. I think
this could be the cause of your issue, what do you think?
> An error is reported when the flink SQL connects to Phoenix
> -----------------------------------------------------------
>
> Key: FLINK-25078
> URL: https://issues.apache.org/jira/browse/FLINK-25078
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / API
> Environment: Maven configuration environment is as follows:
> {code:java}
> //代码占位符
> <?xml version="1.0" encoding="UTF-8"?>
> <project xmlns="http://maven.apache.org/POM/4.0.0"
> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
> xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
> http://maven.apache.org/xsd/maven-4.0.0.xsd">
> <parent>
> <artifactId>iyb-db-account</artifactId>
> <groupId>org.iyunbao</groupId>
> <version>1.0-SNAPSHOT</version>
> </parent>
> <modelVersion>4.0.0</modelVersion>
> <artifactId>iyb-realtime</artifactId>
> <properties>
> <java.version>1.8</java.version>
> <maven.compiler.source>${java.version}</maven.compiler.source>
> <maven.compiler.target>${java.version}</maven.compiler.target>
> <flink.version>1.12.0</flink.version>
> <scala.version>2.12</scala.version>
> <hadoop.version>3.1.3</hadoop.version>
> </properties>
> <dependencies>
> <dependency>
> <groupId>org.apache.flink</groupId>
> <artifactId>flink-connector-hbase-2.2_2.11</artifactId>
> <version>1.12.3</version>
> </dependency>
> <dependency>
> <groupId>org.apache.flink</groupId>
> <artifactId>flink-java</artifactId>
> <version>${flink.version}</version>
> </dependency>
> <dependency>
> <groupId>org.apache.flink</groupId>
> <artifactId>flink-streaming-java_${scala.version}</artifactId>
> <version>${flink.version}</version>
> </dependency>
> <dependency>
> <groupId>org.apache.flink</groupId>
> <artifactId>flink-table-planner_${scala.version}</artifactId>
> <version>${flink.version}</version>
> <scope>provided</scope>
> </dependency>
> <dependency>
> <groupId>org.apache.flink</groupId>
> <artifactId>flink-connector-kafka_${scala.version}</artifactId>
> <version>${flink.version}</version>
> </dependency>
> <dependency>
> <groupId>org.apache.flink</groupId>
> <artifactId>flink-clients_${scala.version}</artifactId>
> <version>${flink.version}</version>
> </dependency>
> <dependency>
> <groupId>org.apache.flink</groupId>
> <artifactId>flink-cep_${scala.version}</artifactId>
> <version>${flink.version}</version>
> </dependency>
> <dependency>
> <groupId>org.apache.flink</groupId>
> <artifactId>flink-json</artifactId>
> <version>${flink.version}</version>
> </dependency>
> <dependency>
> <groupId>com.alibaba</groupId>
> <artifactId>fastjson</artifactId>
> <version>1.2.68</version>
> </dependency>
> <dependency>
> <groupId>org.apache.flink</groupId>
>
> <artifactId>flink-table-api-java-bridge_${scala.version}</artifactId>
> <version>${flink.version}</version>
> </dependency>
> <dependency>
> <groupId>org.apache.flink</groupId>
>
> <artifactId>flink-table-planner-blink_${scala.version}</artifactId>
> <version>${flink.version}</version>
> </dependency>
> <dependency>
> <groupId>org.apache.flink</groupId>
> <artifactId>flink-shaded-hadoop2</artifactId>
> <version>1.6.2</version>
> </dependency>
> <dependency>
> <groupId>org.apache.flink</groupId>
> <artifactId>flink-jdbc_2.12</artifactId>
> <version>1.8.0</version>
> </dependency>
> <!-- dependency>
> <groupId>org.codehaus.janino</groupId>
> <artifactId>janino</artifactId>
> <version>3.0.8</version>
> </dependency -->
> <!--如果保存检查点到hdfs上,需要引入此依赖-->
> <dependency>
> <groupId>org.apache.hadoop</groupId>
> <artifactId>hadoop-client</artifactId>
> <version>${hadoop.version}</version>
> </dependency>
> <!--Flink默认使用的是slf4j记录日志,相当于一个日志的接口,我们这里使用log4j作为具体的日志实现-->
> <dependency>
> <groupId>org.slf4j</groupId>
> <artifactId>slf4j-api</artifactId>
> <version>1.7.25</version>
> </dependency>
> <dependency>
> <groupId>org.slf4j</groupId>
> <artifactId>slf4j-log4j12</artifactId>
> <version>1.7.25</version>
> </dependency>
> <dependency>
> <groupId>org.apache.logging.log4j</groupId>
> <artifactId>log4j-to-slf4j</artifactId>
> <version>2.14.0</version>
> </dependency>
> <!--lomback插件依赖-->
> <dependency>
> <groupId>org.projectlombok</groupId>
> <artifactId>lombok</artifactId>
> <version>1.18.12</version>
> </dependency>
> <dependency>
> <groupId>mysql</groupId>
> <artifactId>mysql-connector-java</artifactId>
> <version>5.1.47</version>
> </dependency>
> <dependency>
> <groupId>org.apache.flink</groupId>
> <artifactId>flink-connector-jdbc_${scala.version}</artifactId>
> <version>${flink.version}</version>
> </dependency>
> <dependency>
> <groupId>com.alibaba.blink</groupId>
> <artifactId>flink-jdbc</artifactId>
> <version>1.5.1</version>
> </dependency>
> <dependency>
> <groupId>org.apache.phoenix</groupId>
> <artifactId>phoenix-core</artifactId>
> <version>5.0.0-HBase-2.0</version>
> </dependency>
> <!-- <dependency>
> <groupId>org.apache.hbase</groupId>
> <artifactId>hbase-client</artifactId>
> <version>2.0.3</version>
> </dependency>-->
> <!-- <dependency>
> <groupId>org.apache.phoenix</groupId>
> <artifactId>phoenix-spark</artifactId>
> <version>5.0.0-HBase-2.0</version>
> <exclusions>
> <exclusion>
> <groupId>org.glassfish</groupId>
> <artifactId>javax.el</artifactId>
> </exclusion>
> </exclusions>
> </dependency>-->
> <dependency>
> <groupId>commons-beanutils</groupId>
> <artifactId>commons-beanutils-core</artifactId>
> <version>1.8.0</version>
> </dependency>
> <dependency>
> <groupId>redis.clients</groupId>
> <artifactId>jedis</artifactId>
> <version>3.3.0</version>
> </dependency>
> </dependencies>
> <build>
> <plugins>
> <plugin>
> <groupId>org.apache.maven.plugins</groupId>
> <artifactId>maven-assembly-plugin</artifactId>
> <version>3.0.0</version>
> <configuration>
> <descriptorRefs>
> <descriptorRef>jar-with-dependencies</descriptorRef>
> </descriptorRefs>
> </configuration>
> <executions>
> <execution>
> <id>make-assembly</id>
> <phase>package</phase>
> <goals>
> <goal>single</goal>
> </goals>
> </execution>
> </executions>
> </plugin>
> </plugins>
> </build>
> </project> {code}
> Reporter: xinli liang
> Priority: Major
>
> I want to know where the problem is. Thanks
>
> The code is as follows
> {code:java}
> //代码占位符
> package com.iyunbao.app.dim;
> import com.iyunbao.common.GmallConfig;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.table.api.EnvironmentSettings;
> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
> /**
> * @author liangxinli
> * @create 2021-11-03 10:33
> */
> public class dim_agt_account_cloud_hhbak {
> public static void main(String[] args) {
> //创建执行环境
> StreamExecutionEnvironment senv =
> StreamExecutionEnvironment.getExecutionEnvironment();
> //todo 创建表环境 bug比较多,建议指定版本
> EnvironmentSettings settings = EnvironmentSettings.newInstance()
> .inStreamingMode()
> .useBlinkPlanner()
> .build();
> //从流环境转换成表环境
> StreamTableEnvironment tableEnv =
> StreamTableEnvironment.create(senv,settings);
> try {
> Class.forName(GmallConfig.PHOENIX_DRIVER);
> } catch (ClassNotFoundException e) {
> e.printStackTrace();
> }
>
> //把mysql 中的表映射成一个输出临时表
> tableEnv.executeSql(
> "Create Table `t_user` (" +
> " `id` varchar ," +
> " `open_id` varchar ," +
> " `user_name` varchar ," +
> " `password` varchar ," +
> " `type` tinyint ," +
> " `login_time` timestamp ," +
> " `extra` varchar ," +
> " `target` varchar ," +
> " `platform_id` bigint ," +
> " `status` tinyint ," +
> " `is_bt` varchar ," +
> " `settle_type` varchar ," +
> " `account_type` varchar ," +
> " `is_rate_visable` varchar ," +
> " `create_time` timestamp ," +
> " `creator` bigint ," +
> " `update_time` timestamp ," +
> " `updater` bigint ," +
> " `valid` varchar ," +
> " `mobile` varchar ," +
> " `is_relation_role` varchar ," +
> " `source` varchar ," +
> " `unique_no` varchar ," +
> " `is_bind_bt` varchar , " +
> "PRIMARY KEY (id) NOT ENFORCED" +
> ")" +
> "with (" +
> " 'connector' = 'jdbc'," +
> " 'driver' =
> 'org.apache.phoenix.jdbc.PhoenixDriver',"+
> " 'url' = 'jdbc:phoenix://hadoop112:2181'," +
> " 'table-name' = 't_user'" +
> " )");
> //todo 对数据的处理方式
> tableEnv.executeSql("select open_id from t_user ").print();
> try {
> senv.execute();
> } catch (Exception e) {
> e.printStackTrace();
> }
> }
> }
> {code}
>
>
> The error is as follows:
> {code:java}
> //代码占位符
> Exception in thread "main" org.apache.flink.table.api.ValidationException:
> Unable to create a source for reading table
> 'default_catalog.default_database.t_user'.Table options are:'connector'='jdbc'
> 'driver'='org.apache.phoenix.jdbc.PhoenixDriver'
> 'table-name'='t_user'
> 'url'='jdbc:phoenix://hadoop112:2181'
> at
> org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:125)
> at
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.createDynamicTableSource(CatalogSourceTable.java:265)
> at
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:100)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3585)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2507)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2144)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2093)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:663)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:570)
> at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:165)
> at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:157)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:823)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:795)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:250)
> at
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:659)
> at
> com.iyunbao.app.dim.dim_agt_account_cloud_hhbak.main(dim_agt_account_cloud_hhbak.java:159)
> Caused by: java.lang.IllegalStateException: Cannot handle such jdbc url:
> jdbc:phoenix://hadoop112:2181
> at org.apache.flink.util.Preconditions.checkState(Preconditions.java:198)
> at
> org.apache.flink.connector.jdbc.table.JdbcDynamicTableFactory.validateConfigOptions(JdbcDynamicTableFactory.java:281)
> at
> org.apache.flink.connector.jdbc.table.JdbcDynamicTableFactory.createDynamicTableSource(JdbcDynamicTableFactory.java:180)
> at
> org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:122)
> ... 19 moreProcess finished with exit code 1
> {code}
--
This message was sent by Atlassian Jira
(v8.20.1#820001)