iQiuyu-0821 commented on issue #2468:
URL: https://github.com/apache/iceberg/issues/2468#issuecomment-860508692


   I have the same problem.
   
   **runtime env**
   flink: 1.12.1
   iceberg-flink-runtime: 0.12.0 builder from master
   **catalog info**
   `
   catalogs: 
       - name: iceberg
         type: iceberg
         catalog-type: hadoop
         warehouse: hdfs://localhost:9000/flink-iceberg/warehouse
         property-version: 1
         clients: 5
   `
   
   **execute sql**
   `
   Flink SQL> set execution.type = streaming;
   [INFO] Session property has been set.
   
   Flink SQL> set table.dynamic-table-options.enabled = true;
   [INFO] Session property has been set.
   
   Flink SQL> select * from iceberg.iceberg_db.t1 /*+ 
OPTIONS('streaming'='true', 'monitor-interval'='1s')*/;
   [ERROR] Could not execute SQL statement. Reason:
   java.lang.ClassCastException: org.apache.iceberg.hadoop.HadoopCatalog cannot 
be cast to org.apache.iceberg.catalog.Catalog
   
   Flink SQL> 
   `
   **error info:**
   `2021-06-14 16:35:57,838 INFO  org.apache.iceberg.BaseMetastoreCatalog       
               [] - Table loaded by catalog: iceberg.iceberg_db.t1
   2021-06-14 16:35:59,910 WARN  org.apache.flink.table.client.cli.CliClient    
              [] - Could not execute SQL statement.
   org.apache.flink.table.client.gateway.SqlExecutionException: Invalid SQL 
query.
        at 
org.apache.flink.table.client.gateway.local.LocalExecutor.executeQueryInternal(LocalExecutor.java:548)
 ~[flink-sql-client_2.11-1.12.1.jar:1.12.1]
        at 
org.apache.flink.table.client.gateway.local.LocalExecutor.executeQuery(LocalExecutor.java:374)
 ~[flink-sql-client_2.11-1.12.1.jar:1.12.1]
        at 
org.apache.flink.table.client.cli.CliClient.callSelect(CliClient.java:648) 
~[flink-sql-client_2.11-1.12.1.jar:1.12.1]
        at 
org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:323) 
~[flink-sql-client_2.11-1.12.1.jar:1.12.1]
        at java.util.Optional.ifPresent(Optional.java:159) [?:1.8.0_171]
        at org.apache.flink.table.client.cli.CliClient.open(CliClient.java:214) 
[flink-sql-client_2.11-1.12.1.jar:1.12.1]
        at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:144) 
[flink-sql-client_2.11-1.12.1.jar:1.12.1]
        at org.apache.flink.table.client.SqlClient.start(SqlClient.java:115) 
[flink-sql-client_2.11-1.12.1.jar:1.12.1]
        at org.apache.flink.table.client.SqlClient.main(SqlClient.java:201) 
[flink-sql-client_2.11-1.12.1.jar:1.12.1]
   Caused by: java.lang.IllegalArgumentException: Cannot initialize Catalog, 
org.apache.iceberg.hadoop.HadoopCatalog does not implement Catalog.
        at org.apache.iceberg.CatalogUtil.loadCatalog(CatalogUtil.java:186) 
~[?:?]
        at 
org.apache.iceberg.flink.CatalogLoader$HadoopCatalogLoader.loadCatalog(CatalogLoader.java:79)
 ~[?:?]
        at 
org.apache.iceberg.flink.TableLoader$CatalogTableLoader.open(TableLoader.java:108)
 ~[?:?]
        at 
org.apache.iceberg.flink.source.FlinkSource$Builder.buildFormat(FlinkSource.java:178)
 ~[?:?]
        at 
org.apache.iceberg.flink.source.FlinkSource$Builder.build(FlinkSource.java:204) 
~[?:?]
        at 
org.apache.iceberg.flink.IcebergTableSource.createDataStream(IcebergTableSource.java:110)
 ~[?:?]
        at 
org.apache.iceberg.flink.IcebergTableSource.access$000(IcebergTableSource.java:49)
 ~[?:?]
        at 
org.apache.iceberg.flink.IcebergTableSource$1.produceDataStream(IcebergTableSource.java:163)
 ~[?:?]
        at 
org.apache.flink.table.planner.plan.nodes.common.CommonPhysicalTableSourceScan.createSourceTransformation(CommonPhysicalTableSourceScan.scala:88)
 ~[flink-table-blink_2.11-1.12.1.jar:1.12.1]
        at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:91)
 ~[flink-table-blink_2.11-1.12.1.jar:1.12.1]
        at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:44)
 ~[flink-table-blink_2.11-1.12.1.jar:1.12.1]
        at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)
 ~[flink-table-blink_2.11-1.12.1.jar:1.12.1]
        at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:44)
 ~[flink-table-blink_2.11-1.12.1.jar:1.12.1]
        at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToTransformation(StreamExecLegacySink.scala:158)
 ~[flink-table-blink_2.11-1.12.1.jar:1.12.1]
        at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:82)
 ~[flink-table-blink_2.11-1.12.1.jar:1.12.1]
        at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:48)
 ~[flink-table-blink_2.11-1.12.1.jar:1.12.1]
        at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)
 ~[flink-table-blink_2.11-1.12.1.jar:1.12.1]
        at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlan(StreamExecLegacySink.scala:48)
 ~[flink-table-blink_2.11-1.12.1.jar:1.12.1]
        at 
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66)
 ~[flink-table-blink_2.11-1.12.1.jar:1.12.1]
        at 
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:65)
 ~[flink-table-blink_2.11-1.12.1.jar:1.12.1]
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 ~[flink-dist_2.11-1.12.1.jar:1.12.1]
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 ~[flink-dist_2.11-1.12.1.jar:1.12.1]
        at scala.collection.Iterator$class.foreach(Iterator.scala:891) 
~[flink-dist_2.11-1.12.1.jar:1.12.1]
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) 
~[flink-dist_2.11-1.12.1.jar:1.12.1]
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) 
~[flink-dist_2.11-1.12.1.jar:1.12.1]
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54) 
~[flink-dist_2.11-1.12.1.jar:1.12.1]
        at 
scala.collection.TraversableLike$class.map(TraversableLike.scala:234) 
~[flink-dist_2.11-1.12.1.jar:1.12.1]
        at scala.collection.AbstractTraversable.map(Traversable.scala:104) 
~[flink-dist_2.11-1.12.1.jar:1.12.1]
        at 
org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:65)
 ~[flink-table-blink_2.11-1.12.1.jar:1.12.1]
        at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:167)
 ~[flink-table-blink_2.11-1.12.1.jar:1.12.1]
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1329)
 ~[flink-table-blink_2.11-1.12.1.jar:1.12.1]
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translateAndClearBuffer(TableEnvironmentImpl.java:1321)
 ~[flink-table-blink_2.11-1.12.1.jar:1.12.1]
        at 
org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.getPipeline(StreamTableEnvironmentImpl.java:328)
 ~[flink-table-blink_2.11-1.12.1.jar:1.12.1]
        at 
org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$createPipeline$1(ExecutionContext.java:287)
 ~[flink-sql-client_2.11-1.12.1.jar:1.12.1]
        at 
org.apache.flink.table.client.gateway.local.ExecutionContext.wrapClassLoader(ExecutionContext.java:256)
 ~[flink-sql-client_2.11-1.12.1.jar:1.12.1]
        at 
org.apache.flink.table.client.gateway.local.ExecutionContext.createPipeline(ExecutionContext.java:282)
 ~[flink-sql-client_2.11-1.12.1.jar:1.12.1]
        at 
org.apache.flink.table.client.gateway.local.LocalExecutor.executeQueryInternal(LocalExecutor.java:542)
 ~[flink-sql-client_2.11-1.12.1.jar:1.12.1]
        ... 8 more
   Caused by: java.lang.ClassCastException: 
org.apache.iceberg.hadoop.HadoopCatalog cannot be cast to 
org.apache.iceberg.catalog.Catalog
        at org.apache.iceberg.CatalogUtil.loadCatalog(CatalogUtil.java:182) 
~[?:?]
        at 
org.apache.iceberg.flink.CatalogLoader$HadoopCatalogLoader.loadCatalog(CatalogLoader.java:79)
 ~[?:?]
        at 
org.apache.iceberg.flink.TableLoader$CatalogTableLoader.open(TableLoader.java:108)
 ~[?:?]
        at 
org.apache.iceberg.flink.source.FlinkSource$Builder.buildFormat(FlinkSource.java:178)
 ~[?:?]
        at 
org.apache.iceberg.flink.source.FlinkSource$Builder.build(FlinkSource.java:204) 
~[?:?]
        at 
org.apache.iceberg.flink.IcebergTableSource.createDataStream(IcebergTableSource.java:110)
 ~[?:?]
        at 
org.apache.iceberg.flink.IcebergTableSource.access$000(IcebergTableSource.java:49)
 ~[?:?]
        at 
org.apache.iceberg.flink.IcebergTableSource$1.produceDataStream(IcebergTableSource.java:163)
 ~[?:?]
        at 
org.apache.flink.table.planner.plan.nodes.common.CommonPhysicalTableSourceScan.createSourceTransformation(CommonPhysicalTableSourceScan.scala:88)
 ~[flink-table-blink_2.11-1.12.1.jar:1.12.1]
        at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:91)
 ~[flink-table-blink_2.11-1.12.1.jar:1.12.1]
        at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:44)
 ~[flink-table-blink_2.11-1.12.1.jar:1.12.1]
        at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)
 ~[flink-table-blink_2.11-1.12.1.jar:1.12.1]
        at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:44)
 ~[flink-table-blink_2.11-1.12.1.jar:1.12.1]
        at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToTransformation(StreamExecLegacySink.scala:158)
 ~[flink-table-blink_2.11-1.12.1.jar:1.12.1]
        at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:82)
 ~[flink-table-blink_2.11-1.12.1.jar:1.12.1]
        at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:48)
 ~[flink-table-blink_2.11-1.12.1.jar:1.12.1]
        at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)
 ~[flink-table-blink_2.11-1.12.1.jar:1.12.1]
        at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlan(StreamExecLegacySink.scala:48)
 ~[flink-table-blink_2.11-1.12.1.jar:1.12.1]
        at 
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66)
 ~[flink-table-blink_2.11-1.12.1.jar:1.12.1]
        at 
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:65)
 ~[flink-table-blink_2.11-1.12.1.jar:1.12.1]
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 ~[flink-dist_2.11-1.12.1.jar:1.12.1]
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 ~[flink-dist_2.11-1.12.1.jar:1.12.1]
        at scala.collection.Iterator$class.foreach(Iterator.scala:891) 
~[flink-dist_2.11-1.12.1.jar:1.12.1]
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) 
~[flink-dist_2.11-1.12.1.jar:1.12.1]
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) 
~[flink-dist_2.11-1.12.1.jar:1.12.1]
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54) 
~[flink-dist_2.11-1.12.1.jar:1.12.1]
        at 
scala.collection.TraversableLike$class.map(TraversableLike.scala:234) 
~[flink-dist_2.11-1.12.1.jar:1.12.1]
        at scala.collection.AbstractTraversable.map(Traversable.scala:104) 
~[flink-dist_2.11-1.12.1.jar:1.12.1]
        at 
org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:65)
 ~[flink-table-blink_2.11-1.12.1.jar:1.12.1]
        at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:167)
 ~[flink-table-blink_2.11-1.12.1.jar:1.12.1]
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1329)
 ~[flink-table-blink_2.11-1.12.1.jar:1.12.1]
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translateAndClearBuffer(TableEnvironmentImpl.java:1321)
 ~[flink-table-blink_2.11-1.12.1.jar:1.12.1]
        at 
org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.getPipeline(StreamTableEnvironmentImpl.java:328)
 ~[flink-table-blink_2.11-1.12.1.jar:1.12.1]
        at 
org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$createPipeline$1(ExecutionContext.java:287)
 ~[flink-sql-client_2.11-1.12.1.jar:1.12.1]
        at 
org.apache.flink.table.client.gateway.local.ExecutionContext.wrapClassLoader(ExecutionContext.java:256)
 ~[flink-sql-client_2.11-1.12.1.jar:1.12.1]
        at 
org.apache.flink.table.client.gateway.local.ExecutionContext.createPipeline(ExecutionContext.java:282)
 ~[flink-sql-client_2.11-1.12.1.jar:1.12.1]
        at 
org.apache.flink.table.client.gateway.local.LocalExecutor.executeQueryInternal(LocalExecutor.java:542)
 ~[flink-sql-client_2.11-1.12.1.jar:1.12.1]
        ... 8 more
   `


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to