[ 
https://issues.apache.org/jira/browse/FLINK-24285?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17415075#comment-17415075
 ] 

James Kim commented on FLINK-24285:
-----------------------------------

According to 
[https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/formats/csv/,]
 I needed to add another csv dependency in pom.xml.

> Flink Table API Could not find any format factory for identifier 'csv' in the 
> classpath.
> ----------------------------------------------------------------------------------------
>
>                 Key: FLINK-24285
>                 URL: https://issues.apache.org/jira/browse/FLINK-24285
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / API, Table SQL / Client
>    Affects Versions: 1.13.2
>         Environment: Ubuntu 18.04
>            Reporter: James Kim
>            Priority: Critical
>
> I'm trying to read a csv file from s3 compatible storage through s3a protocol 
> through Java code.
>  
> This is the main class that I have:
> {code:java}
> import org.apache.flink.table.api.EnvironmentSettings;
> import org.apache.flink.table.api.TableEnvironment;
> import org.apache.flink.table.api.TableResult;
> import org.apache.flink.types.Row;
> import org.apache.flink.util.CloseableIterator;
> public class Main {
>     public static void main(String[] args) {
>         // create a TableEnvironment for batch or streaming execution
>         EnvironmentSettings settings = EnvironmentSettings
>                 .newInstance()
>                 .inBatchMode()
>                 .build();
>         TableEnvironment tableEnv = TableEnvironment.create(settings);
>         // create an input Table
>         TableResult tempResult = tableEnv.executeSql(
> //                "create temporary table ATHLETES (\n" +
>                         "create table ATHLETES (\n" +
>                 "name varchar,\n" +
>                 "country varchar,\n" +
>                 "sport varchar\n" +
>                 ") with (\n" +
>                 "'connector' = 'filesystem',\n" +
>                 
> "'path'='s3a://testbucket/expFolder/2020_Tokyo_Olympics/Athletes.csv',\n" +
>                 "'format'='csv'\n" +
>                 ")\n");
>         TableResult table2 = tableEnv.executeSql("select * from 
> ATHLETES");lder
> {code}
> However, when I run this code, I get an exception at the executeSql call. The 
> error log is the following: 
> {code:java}
> SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".LF4J: Failed 
> to load class "org.slf4j.impl.StaticLoggerBinder".SLF4J: Defaulting to 
> no-operation (NOP) logger implementationSLF4J: See 
> http://www.slf4j.org/codes.html#StaticLoggerBinder for further 
> details.Exception in thread "main" 
> org.apache.flink.table.api.ValidationException: Unable to create a source for 
> reading table 'default_catalog.default_database.ATHLETES'.
> Table options are:
> 'connector'='filesystem''format'='csv''path'='s3a://testbucket/james_experiment/2020_Tokyo_Olympics/Athletes.csv'
>  at 
> org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:137)
>  at 
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.createDynamicTableSource(CatalogSourceTable.java:116)
>  at 
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:82)
>  at 
> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3585)
>  at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2507)
>  at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2144)
>  at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2093)
>  at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050)
>  at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:663)
>  at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644)
>  at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438)
>  at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:570)
>  at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:169)
>  at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:161)
>  at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:989)
>  at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:958)
>  at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:283)
>  at 
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101)
>  at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:724)
>  at Main.main(Main.java:31)Caused by: 
> org.apache.flink.table.api.ValidationException: Could not find any format 
> factory for identifier 'csv' in the classpath. at 
> org.apache.flink.table.filesystem.FileSystemTableSource.<init>(FileSystemTableSource.java:97)
>  at 
> org.apache.flink.table.filesystem.FileSystemTableFactory.createDynamicTableSource(FileSystemTableFactory.java:74)
>  at 
> org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:134)
>  ... 19 more
> {code}
> The exception mentions that it is unable to create a source for reading table 
> 'default_catalog.default_database.ATHLETES' and down below says Could not 
> find any format for identifier 'csv' in the classpath.
> The pom.xml I have is the following:
> {code:java}
> <?xml version="1.0" encoding="UTF-8"?>
> <project xmlns="http://maven.apache.org/POM/4.0.0";
>          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
>          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
> http://maven.apache.org/xsd/maven-4.0.0.xsd";>
>     <modelVersion>4.0.0</modelVersion>
>     <groupId>groupId</groupId>
>     <artifactId>flink-ecs-sample</artifactId>
>     <version>1.0-SNAPSHOT</version>
>     <properties>
>         <maven.compiler.source>11</maven.compiler.source>
>         <maven.compiler.target>11</maven.compiler.target>
>     </properties>
>     <dependencies>
>         <dependency>
>             <groupId>org.apache.flink</groupId>
>             <artifactId>flink-table-api-java-bridge_2.11</artifactId>
>             <version>1.13.2</version>
>             <scope>compile</scope>
> <!--            <scope>provided</scope>-->
>         </dependency>
>         <dependency>
>             <groupId>org.apache.flink</groupId>
>             <artifactId>flink-table-planner-blink_2.11</artifactId>
>             <version>1.13.2</version>
>             <scope>compile</scope>
> <!--            <scope>provided</scope>-->
>         </dependency>
>         <dependency>
>             <groupId>org.apache.flink</groupId>
>             <artifactId>flink-streaming-scala_2.11</artifactId>
>             <version>1.13.2</version>
>             <scope>compile</scope>
> <!--            <scope>provided</scope>-->
>         </dependency>
>         <dependency>
>             <groupId>org.apache.flink</groupId>
>             <artifactId>flink-table-common</artifactId>
>             <version>1.13.2</version>
>             <scope>compile</scope>
> <!--            <scope>provided</scope>-->
>         </dependency>
>     </dependencies>
> </project>
> {code}
>  
> I'm going through the docs 
> ([https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/common/)]
>  but I'm not sure why I'm getting this error. Could I get some help?
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to