openinx commented on a change in pull request #1956:
URL: https://github.com/apache/iceberg/pull/1956#discussion_r568279327



##########
File path: 
flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java
##########
@@ -136,13 +133,23 @@ public void testCreateTableIfNotExists() {
 
     sql("DROP TABLE tl");
     AssertHelpers.assertThrows("Table 'tl' should be dropped",
-        NoSuchTableException.class, "Table does not exist: db.tl", () -> 
table("tl"));
+        NoSuchTableException.class,
+        "Table does not exist: " + getFullQualifiedTableName("tl"),
+        () -> table("tl"));
 
-    sql("CREATE TABLE IF NO EXISTS tl(id BIGINT)");
+    sql("CREATE TABLE IF NOT EXISTS tl(id BIGINT)");
     Assert.assertEquals(Maps.newHashMap(), table("tl").properties());
 
-    sql("CREATE TABLE IF NOT EXISTS tl(id BIGINT) WITH 
('location'='/tmp/location')");
-    Assert.assertEquals("Should still be the old table.", Maps.newHashMap(), 
table("tl").properties());
+    final String uuid = UUID.randomUUID().toString();

Review comment:
       Thanks a lot for the unit tests improvement ! 

##########
File path: 
flink/src/test/java/org/apache/iceberg/flink/FlinkCatalogTestBase.java
##########
@@ -119,6 +123,12 @@ protected String warehouseRoot() {
     }
   }
 
+  protected String getFullQualifiedTableName(String tableName) {
+    final List<String> levels = new 
ArrayList<>(Arrays.asList(icebergNamespace.levels()));

Review comment:
       Nit: in apache iceberg, we usually use the unified `Lists.newArrayLists` 
to create a new ArrayList.

##########
File path: 
flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java
##########
@@ -107,7 +107,7 @@ public void testRenameTable() {
         () -> getTableEnv().from("tl")
     );
     Assert.assertEquals(
-        Collections.singletonList(TableColumn.of("id", DataTypes.BIGINT())),
+        Collections.singletonList(TableColumn.physical("id", 
DataTypes.BIGINT())),

Review comment:
       @stevenzwu ,  do you think  this issue need to fix in this PR ? I think 
we'd better to... 

##########
File path: flink/src/test/java/org/apache/iceberg/flink/MiniClusterBase.java
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.junit.ClassRule;
+import org.junit.rules.TemporaryFolder;
+
+/**
+ * It will start a mini cluster with 
classloader.check-leaked-classloader=false, so that we won't break the unit 
tests
+ * because of the class loader leak issue. In our iceberg integration tests, 
there're some that will assert the results
+ * after finished the flink jobs, so actually we may access the class loader 
that has been closed by the flink task
+ * managers if we enable the switch classloader.check-leaked-classloader by 
default.
+ */
+public class MiniClusterBase extends TestBaseUtils {
+
+  private static final int DEFAULT_PARALLELISM = 4;
+
+  public static final Configuration CONFIG = new Configuration()
+      // disable classloader check as Avro may cache class/object in the 
serializers.
+      .set(CoreOptions.CHECK_LEAKED_CLASSLOADER, false);
+
+  @ClassRule
+  public static MiniClusterWithClientResource miniClusterResource = new 
MiniClusterWithClientResource(

Review comment:
       I saw the 
[TestFlinkIcebergSinkV2](https://github.com/apache/iceberg/pull/1956/files#diff-13e2e5b52d0effe51e1b470df77cb08b5ec8cc4f3a7f0fd4e51ee212fc83f76aR73)
 also defines the similar mini cluster resources, then how about make it into a 
small methods so that the TestFlinkIcebergSinkV2 could reuse it ?   For future 
defined mini cluster resource, we'd better also reuse this one because it will 
be easy to forget to disable this `CHECK_LEAKED_CLASSLOADER` switch for 
developers.
   
   ```java
     @ClassRule
     public static MiniClusterWithClientResource miniClusterResource = 
createMiniClusterResource();
   
     @ClassRule
     public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
   
     public static MiniClusterWithClientResource createMiniClusterResource() {
       return new MiniClusterWithClientResource(
           new MiniClusterResourceConfiguration.Builder()
               .setNumberTaskManagers(1)
               .setNumberSlotsPerTaskManager(DEFAULT_PARALLELISM)
               .setConfiguration(CONFIG)
               .build());
     }
   ```
   
   The TestFlinkIcebergSinkV2 could just use: 
   
   ```java
   @ClassRule
   public static MiniClusterWithClientResource miniClusterResource = 
MiniClusterBase.createMiniClusterResource();
   ```

##########
File path: flink/src/main/java/org/apache/iceberg/flink/actions/Actions.java
##########
@@ -37,7 +43,7 @@ public static Actions forTable(StreamExecutionEnvironment 
env, Table table) {
   }
 
   public static Actions forTable(Table table) {
-    return new Actions(StreamExecutionEnvironment.getExecutionEnvironment(), 
table);
+    return new 
Actions(StreamExecutionEnvironment.getExecutionEnvironment(CONFIG), table);

Review comment:
       Thanks a lot for this.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to