This is an automated email from the ASF dual-hosted git repository.

openinx pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new bf2cbc3  Flink: Rename FlinkTableOptions to more generic 
FlinkConfigOptions
bf2cbc3 is described below

commit bf2cbc3d6cc4d8fbb261cc48de75592f1caa807a
Author: Steven Zhen Wu <[email protected]>
AuthorDate: Mon Jun 28 06:00:35 2021 -0700

    Flink: Rename FlinkTableOptions to more generic FlinkConfigOptions
---
 .../{FlinkTableOptions.java => FlinkConfigOptions.java} |  4 ++--
 .../org/apache/iceberg/flink/source/FlinkSource.java    | 17 +++++++++--------
 .../java/org/apache/iceberg/flink/FlinkTestBase.java    |  2 +-
 .../apache/iceberg/flink/source/TestFlinkScanSql.java   |  6 +++---
 4 files changed, 15 insertions(+), 14 deletions(-)

diff --git 
a/flink/src/main/java/org/apache/iceberg/flink/FlinkTableOptions.java 
b/flink/src/main/java/org/apache/iceberg/flink/FlinkConfigOptions.java
similarity index 96%
rename from flink/src/main/java/org/apache/iceberg/flink/FlinkTableOptions.java
rename to flink/src/main/java/org/apache/iceberg/flink/FlinkConfigOptions.java
index 145190c..067abe8 100644
--- a/flink/src/main/java/org/apache/iceberg/flink/FlinkTableOptions.java
+++ b/flink/src/main/java/org/apache/iceberg/flink/FlinkConfigOptions.java
@@ -23,9 +23,9 @@ package org.apache.iceberg.flink;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
 
-public class FlinkTableOptions {
+public class FlinkConfigOptions {
 
-  private FlinkTableOptions() {
+  private FlinkConfigOptions() {
   }
 
   public static final ConfigOption<Boolean> 
TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM =
diff --git 
a/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java 
b/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java
index 84507c4..a3263d2 100644
--- a/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java
+++ b/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java
@@ -36,8 +36,8 @@ import org.apache.iceberg.Table;
 import org.apache.iceberg.TableScan;
 import org.apache.iceberg.encryption.EncryptionManager;
 import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.flink.FlinkConfigOptions;
 import org.apache.iceberg.flink.FlinkSchemaUtil;
-import org.apache.iceberg.flink.FlinkTableOptions;
 import org.apache.iceberg.flink.TableLoader;
 import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
 import org.apache.iceberg.io.FileIO;
@@ -48,11 +48,11 @@ public class FlinkSource {
   }
 
   /**
-   * Initialize a {@link Builder} to read the data from iceberg table. 
Equivalent to {@link TableScan}.
-   * See more options in {@link ScanContext}.
+   * Initialize a {@link Builder} to read the data from iceberg table. 
Equivalent to {@link TableScan}. See more options
+   * in {@link ScanContext}.
    * <p>
-   * The Source can be read static data in bounded mode. It can also 
continuously check the arrival of new data and
-   * read records incrementally.
+   * The Source can be read static data in bounded mode. It can also 
continuously check the arrival of new data and read
+   * records incrementally.
    * <ul>
    *   <li>Without startSnapshotId: Bounded</li>
    *   <li>With startSnapshotId and with endSnapshotId: Bounded</li>
@@ -222,10 +222,11 @@ public class FlinkSource {
 
     int inferParallelism(FlinkInputFormat format, ScanContext context) {
       int parallelism = 
readableConfig.get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM);
-      if 
(readableConfig.get(FlinkTableOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM))
 {
-        int maxInferParallelism = 
readableConfig.get(FlinkTableOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM_MAX);
+      if 
(readableConfig.get(FlinkConfigOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM))
 {
+        int maxInferParallelism = readableConfig.get(FlinkConfigOptions
+            .TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM_MAX);
         Preconditions.checkState(maxInferParallelism >= 1,
-            
FlinkTableOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM_MAX.key() + " 
cannot be less than 1");
+            
FlinkConfigOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM_MAX.key() + " 
cannot be less than 1");
         int splitNum;
         try {
           FlinkInputSplit[] splits = format.createInputSplits(0);
diff --git a/flink/src/test/java/org/apache/iceberg/flink/FlinkTestBase.java 
b/flink/src/test/java/org/apache/iceberg/flink/FlinkTestBase.java
index 2810326..1ee000c 100644
--- a/flink/src/test/java/org/apache/iceberg/flink/FlinkTestBase.java
+++ b/flink/src/test/java/org/apache/iceberg/flink/FlinkTestBase.java
@@ -79,7 +79,7 @@ public abstract class FlinkTestBase extends TestBaseUtils {
               .build();
 
           TableEnvironment env = TableEnvironment.create(settings);
-          
env.getConfig().getConfiguration().set(FlinkTableOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM,
 false);
+          
env.getConfig().getConfiguration().set(FlinkConfigOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM,
 false);
           tEnv = env;
         }
       }
diff --git 
a/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScanSql.java 
b/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScanSql.java
index f99e4b6..9af1b7c 100644
--- a/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScanSql.java
+++ b/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScanSql.java
@@ -38,7 +38,7 @@ import org.apache.iceberg.data.RandomGenericData;
 import org.apache.iceberg.data.Record;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.expressions.Expressions;
-import org.apache.iceberg.flink.FlinkTableOptions;
+import org.apache.iceberg.flink.FlinkConfigOptions;
 import org.apache.iceberg.flink.TableLoader;
 import org.apache.iceberg.flink.TestFixtures;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
@@ -158,7 +158,7 @@ public class TestFlinkScanSql extends TestFlinkSource {
 
     // 2 splits and max infer parallelism is 1 (max < splits num), the 
parallelism is  1
     Configuration configuration = new Configuration();
-    
configuration.setInteger(FlinkTableOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM_MAX,
 1);
+    
configuration.setInteger(FlinkConfigOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM_MAX,
 1);
     parallelism = FlinkSource.forRowData()
         .flinkConf(configuration)
         .inferParallelism(flinkInputFormat, ScanContext.builder().build());
@@ -171,7 +171,7 @@ public class TestFlinkScanSql extends TestFlinkSource {
     Assert.assertEquals("Should produce the expected parallelism.", 1, 
parallelism);
 
     // 2 splits, infer parallelism is disabled, the parallelism is flink 
default parallelism 1
-    
configuration.setBoolean(FlinkTableOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM,
 false);
+    
configuration.setBoolean(FlinkConfigOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM,
 false);
     parallelism = FlinkSource.forRowData()
         .flinkConf(configuration)
         .inferParallelism(flinkInputFormat, 
ScanContext.builder().limit(3).build());

Reply via email to