xinli liang created FLINK-25078:
-----------------------------------

             Summary: An error is reported when the flink SQL connects to 
Phoenix
                 Key: FLINK-25078
                 URL: https://issues.apache.org/jira/browse/FLINK-25078
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / API
         Environment: Maven configuration environment is as follows:
{code:java}
//代码占位符
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0";
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
    <parent>
        <artifactId>iyb-db-account</artifactId>
        <groupId>org.iyunbao</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>iyb-realtime</artifactId>

    <properties>
        <java.version>1.8</java.version>
        <maven.compiler.source>${java.version}</maven.compiler.source>
        <maven.compiler.target>${java.version}</maven.compiler.target>
        <flink.version>1.12.0</flink.version>
        <scala.version>2.12</scala.version>
        <hadoop.version>3.1.3</hadoop.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-hbase-2.2_2.11</artifactId>
            <version>1.12.3</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_${scala.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-cep_${scala.version}</artifactId>
            <version>${flink.version}</version>

        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>${flink.version}</version>

        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.68</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            
<artifactId>flink-table-api-java-bridge_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_${scala.version}</artifactId>
            <version>${flink.version}</version>

        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-shaded-hadoop2</artifactId>
            <version>1.6.2</version>

        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-jdbc_2.12</artifactId>
            <version>1.8.0</version>

        </dependency>

        <!-- dependency>
            <groupId>org.codehaus.janino</groupId>
            <artifactId>janino</artifactId>
            <version>3.0.8</version>
        </dependency -->
        <!--如果保存检查点到hdfs上,需要引入此依赖-->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>

        <!--Flink默认使用的是slf4j记录日志,相当于一个日志的接口,我们这里使用log4j作为具体的日志实现-->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.25</version>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.25</version>
        </dependency>

        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-to-slf4j</artifactId>
            <version>2.14.0</version>
        </dependency>


        <!--lomback插件依赖-->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.12</version>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.47</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba.blink</groupId>
            <artifactId>flink-jdbc</artifactId>
            <version>1.5.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.phoenix</groupId>
            <artifactId>phoenix-core</artifactId>
            <version>5.0.0-HBase-2.0</version>
        </dependency>

       <!-- <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>2.0.3</version>
        </dependency>-->

      <!--  <dependency>
            <groupId>org.apache.phoenix</groupId>
            <artifactId>phoenix-spark</artifactId>
            <version>5.0.0-HBase-2.0</version>
            <exclusions>
                <exclusion>
                    <groupId>org.glassfish</groupId>
                    <artifactId>javax.el</artifactId>
                </exclusion>
            </exclusions>
        </dependency>-->

        <dependency>
            <groupId>commons-beanutils</groupId>
            <artifactId>commons-beanutils-core</artifactId>
            <version>1.8.0</version>
        </dependency>

        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>3.3.0</version>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project> {code}
            Reporter: xinli liang


I want to know where the problem is. Thanks

 

The code is as follows
{code:java}
//代码占位符
package com.iyunbao.app.dim;

import com.iyunbao.common.GmallConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * @author liangxinli
 * @create 2021-11-03 10:33
 */
public class dim_agt_account_cloud_hhbak {
    public static void main(String[] args) {

        //创建执行环境
        StreamExecutionEnvironment senv = 
StreamExecutionEnvironment.getExecutionEnvironment();

        //todo 创建表环境   bug比较多,建议指定版本
        EnvironmentSettings settings = EnvironmentSettings.newInstance()
                .inStreamingMode()
                .useBlinkPlanner()
                .build();
        //从流环境转换成表环境
        StreamTableEnvironment tableEnv = 
StreamTableEnvironment.create(senv,settings);

        try {
            Class.forName(GmallConfig.PHOENIX_DRIVER);
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        }
  
        //把mysql 中的表映射成一个输出临时表
        tableEnv.executeSql(
                "Create Table `t_user` (" +
                        " `id` varchar  ," +
                        " `open_id` varchar  ," +
                        " `user_name` varchar  ," +
                        " `password` varchar    ," +
                        " `type` tinyint  ," +
                        " `login_time` timestamp  ," +
                        " `extra` varchar  ," +
                        " `target` varchar  ," +
                        " `platform_id` bigint  ," +
                        " `status` tinyint  ," +
                        " `is_bt` varchar    ," +
                        " `settle_type` varchar  ," +
                        " `account_type` varchar  ," +
                        " `is_rate_visable` varchar    ," +
                        " `create_time` timestamp  ," +
                        " `creator` bigint  ," +
                        " `update_time` timestamp  ," +
                        " `updater` bigint  ," +
                        " `valid` varchar  ," +
                        " `mobile` varchar  ," +
                        " `is_relation_role` varchar    ," +
                        " `source` varchar     ," +
                        " `unique_no` varchar  ," +
                        " `is_bind_bt` varchar ,   " +
                        "PRIMARY KEY (id) NOT ENFORCED" +
                        ")" +
                        "with (" +
                        "  'connector' = 'jdbc'," +
                        "   'driver' = 
'org.apache.phoenix.jdbc.PhoenixDriver',"+
                        "   'url' = 'jdbc:phoenix://hadoop112:2181'," +
                        "   'table-name' = 't_user'" +
                        " )");

        //todo 对数据的处理方式
        tableEnv.executeSql("select open_id  from t_user  ").print();

        try {
            senv.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}
 {code}
 

 

The error is as follows:
{code:java}
//代码占位符
Exception in thread "main" org.apache.flink.table.api.ValidationException: 
Unable to create a source for reading table 
'default_catalog.default_database.t_user'.Table options are:'connector'='jdbc'
'driver'='org.apache.phoenix.jdbc.PhoenixDriver'
'table-name'='t_user'
'url'='jdbc:phoenix://hadoop112:2181'
    at 
org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:125)
    at 
org.apache.flink.table.planner.plan.schema.CatalogSourceTable.createDynamicTableSource(CatalogSourceTable.java:265)
    at 
org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:100)
    at 
org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3585)
    at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2507)
    at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2144)
    at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2093)
    at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050)
    at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:663)
    at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644)
    at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438)
    at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:570)
    at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:165)
    at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:157)
    at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:823)
    at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:795)
    at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:250)
    at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
    at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:659)
    at 
com.iyunbao.app.dim.dim_agt_account_cloud_hhbak.main(dim_agt_account_cloud_hhbak.java:159)
Caused by: java.lang.IllegalStateException: Cannot handle such jdbc url: 
jdbc:phoenix://hadoop112:2181
    at org.apache.flink.util.Preconditions.checkState(Preconditions.java:198)
    at 
org.apache.flink.connector.jdbc.table.JdbcDynamicTableFactory.validateConfigOptions(JdbcDynamicTableFactory.java:281)
    at 
org.apache.flink.connector.jdbc.table.JdbcDynamicTableFactory.createDynamicTableSource(JdbcDynamicTableFactory.java:180)
    at 
org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:122)
    ... 19 moreProcess finished with exit code 1
 {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to