mumuhhh commented on code in PR #7866:
URL: https://github.com/apache/iceberg/pull/7866#discussion_r1240482278


##########
flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkDynamicTableFactory.java:
##########
@@ -105,18 +105,21 @@ public DynamicTableSource 
createDynamicTableSource(Context context) {
 
   @Override
   public DynamicTableSink createDynamicTableSink(Context context) {
-    ObjectPath objectPath = context.getObjectIdentifier().toObjectPath();
+    ObjectIdentifier objectIdentifier = context.getObjectIdentifier();

Review Comment:
   Execute the following code:
   ```
           File warehouseDir = Files.createTempDir();
           EnvironmentSettings settings = 
EnvironmentSettings.newInstance().inBatchMode().build();
   
           TableEnvironment tEnv = TableEnvironment.create(settings);
           Table table =
                   tEnv.from(
                           TableDescriptor.forConnector("datagen")
                                   .schema(
                                           Schema.newBuilder()
                                                   .column("f0", 
DataTypes.STRING())
                                                   .build())
                                   .option("number-of-rows", "3")
                                   .build());
   
           TableDescriptor descriptor =
                   TableDescriptor.forConnector("iceberg")
                           .schema(Schema.newBuilder().column("f0", 
DataTypes.STRING()).build())
                           .option("catalog-name", "hadoop_test")
                           .option("catalog-type", "hadoop")
                           .option("catalog-database", "test_db")
                           .option("catalog-table", "test")
                           .option("warehouse", warehouseDir.getAbsolutePath())
                           .build();
   
           table.insertInto(descriptor).execute();
   ```
   An exception will occur
   ```
   Unable to create a sink for writing table '*anonymous_iceberg$2*'.
   
   Table options are:
   
   'catalog-database'='test_db'
   'catalog-name'='hadoop_test'
   'catalog-table'='test'
   'catalog-type'='hadoop'
   'connector'='iceberg'
   'warehouse'='C:\Users\huawei\AppData\Local\Temp\1687560451312-0'
   org.apache.flink.table.api.ValidationException: Unable to create a sink for 
writing table '*anonymous_iceberg$2*'.
   
   Table options are:
   
   'catalog-database'='test_db'
   'catalog-name'='hadoop_test'
   'catalog-table'='test'
   'catalog-type'='hadoop'
   'connector'='iceberg'
   'warehouse'='C:\Users\huawei\AppData\Local\Temp\1687560451312-0'
        at 
org.apache.flink.table.factories.FactoryUtil.createDynamicTableSink(FactoryUtil.java:262)
        at 
org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:434)
        at 
org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:227)
        at 
org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translate$1(PlannerBase.scala:185)
        at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:285)
        at scala.collection.Iterator.foreach(Iterator.scala:943)
        at scala.collection.Iterator.foreach$(Iterator.scala:943)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
        at scala.collection.IterableLike.foreach(IterableLike.scala:74)
        at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
        at scala.collection.TraversableLike.map(TraversableLike.scala:285)
        at scala.collection.TraversableLike.map$(TraversableLike.scala:278)
        at scala.collection.AbstractTraversable.map(Traversable.scala:108)
        at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:185)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1656)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:782)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:861)
        at 
org.apache.flink.table.api.internal.TablePipelineImpl.execute(TablePipelineImpl.java:56)
        at 
com.test.TestFlinkAnonymousTable.testWriteAnonymousTable(TestFlinkAnonymousTable.java:48)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
        at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
        at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
        at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
        at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
        at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
        at 
org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
        at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
        at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
        at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
        at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
        at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
        at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
        at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
        at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
        at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
        at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
        at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
        at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
        at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
        at 
org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
        at 
org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
        at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
        at 
org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33)
        at 
org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94)
        at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
        at 
org.gradle.api.internal.tasks.testing.worker.TestWorker$2.run(TestWorker.java:176)
        at 
org.gradle.api.internal.tasks.testing.worker.TestWorker.executeAndMaintainThreadName(TestWorker.java:129)
        at 
org.gradle.api.internal.tasks.testing.worker.TestWorker.execute(TestWorker.java:100)
        at 
org.gradle.api.internal.tasks.testing.worker.TestWorker.execute(TestWorker.java:60)
        at 
org.gradle.process.internal.worker.child.ActionExecutionWorker.execute(ActionExecutionWorker.java:56)
        at 
org.gradle.process.internal.worker.child.SystemApplicationClassLoaderWorker.call(SystemApplicationClassLoaderWorker.java:133)
        at 
org.gradle.process.internal.worker.child.SystemApplicationClassLoaderWorker.call(SystemApplicationClassLoaderWorker.java:71)
        at 
worker.org.gradle.process.internal.worker.GradleWorkerMain.run(GradleWorkerMain.java:69)
        at 
worker.org.gradle.process.internal.worker.GradleWorkerMain.main(GradleWorkerMain.java:74)
   Caused by: org.apache.flink.table.api.TableException: This ObjectIdentifier 
instance refers to an anonymous object, hence it cannot be converted to 
ObjectPath and cannot be serialized.
        at 
org.apache.flink.table.catalog.ObjectIdentifier.toObjectPath(ObjectIdentifier.java:112)
        at 
org.apache.iceberg.flink.FlinkDynamicTableFactory.createDynamicTableSink(FlinkDynamicTableFactory.java:108)
        at 
org.apache.flink.table.factories.FactoryUtil.createDynamicTableSink(FactoryUtil.java:259)
        ... 63 more
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to