openinx commented on a change in pull request #1936:
URL: https://github.com/apache/iceberg/pull/1936#discussion_r560714880



##########
File path: flink/src/main/java/org/apache/iceberg/flink/IcebergTableSource.java
##########
@@ -79,13 +84,13 @@ public boolean isBounded() {
 
   @Override
   public TableSource<RowData> projectFields(int[] fields) {
-    return new IcebergTableSource(loader, schema, properties, fields, 
isLimitPushDown, limit, filters);
+    return new IcebergTableSource(loader, schema, properties, fields, 
isLimitPushDown, limit, filters, readableConfig);
   }
 
   @Override
   public DataStream<RowData> getDataStream(StreamExecutionEnvironment execEnv) 
{
     return 
FlinkSource.forRowData().env(execEnv).tableLoader(loader).project(getProjectedSchema()).limit(limit)
-        .filters(filters).properties(properties).build();
+        
.filters(filters).flinkConf(readableConfig).properties(properties).build();

Review comment:
       Nit:  I'd like to change this builder chain like the following ( That's 
more easy to read the change): 
   
   ```java
     @Override
     public DataStream<RowData> getDataStream(StreamExecutionEnvironment 
execEnv) {
       return FlinkSource.forRowData()
           .env(execEnv)
           .tableLoader(loader)
           .project(getProjectedSchema())
           .limit(limit)
           .filters(filters)
           .flinkConf(readableConfig)
           .properties(properties)
           .build();
     }
   ```

##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java
##########
@@ -208,6 +217,30 @@ public FlinkInputFormat buildFormat() {
             .transform(readerOperatorName, typeInfo, 
StreamingReaderOperator.factory(format));
       }
     }
+
+    private DataStream<RowData> createInputDataStream(FlinkInputFormat format, 
ScanContext context,
+                                                      TypeInformation<RowData> 
typeInfo) {
+      int parallelism = 
flinkConf.get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM);
+      if 
(flinkConf.get(FlinkTableOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM)) {
+        int max = 
flinkConf.get(FlinkTableOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM_MAX);
+        Preconditions.checkState(max >= 1,
+            
FlinkTableOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM_MAX.key() + " 
cannot be less than 1");
+        int splitNum = 0;
+        try {
+          FlinkInputSplit[] splits = format.createInputSplits(0);
+          splitNum = splits.length;
+        } catch (IOException e) {
+          throw new UncheckedIOException("Failed to create iceberg input 
splits for table: " + table, e);
+        }
+
+        parallelism = Math.min(splitNum, max);
+      }
+
+      int limitInt = context.limit() > Integer.MAX_VALUE ? Integer.MAX_VALUE : 
(int) context.limit();
+      parallelism = limitInt > 0 ? Math.min(parallelism, limitInt) : 
parallelism;

Review comment:
       Nit:  I'd like to make this code more readable: 
   
   ```java
         if (context.limit() > 0) {
           int limit = context.limit() >= Integer.MAX_VALUE ? Integer.MAX_VALUE 
: (int) context.limit();
           parallelism = Math.min(parallelism, limit);
         }
   
        // parallelism must be positive.
         parallelism = Math.max(1, parallelism); 
   ```

##########
File path: 
flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSource.java
##########
@@ -685,4 +782,60 @@ public void testSqlParseError() {
     AssertHelpers.assertThrows("The NaN is not supported by flink now. ",
         NumberFormatException.class, () -> sql(sqlParseErrorLTE));
   }
+
+  /**
+   * The sql can be executed in both streaming and batch mode, in order to get 
the parallelism, we convert the flink
+   * Table to flink DataStream, so we only use streaming mode here.
+   *
+   * @throws TableNotExistException table not exist exception
+   */
+  @Test
+  public void testInferedParallelism() throws TableNotExistException {
+    Assume.assumeTrue("The execute mode should  be streaming mode", 
isStreamingJob);

Review comment:
       In this way, we don't have to change so many codes in this class. Maybe 
we could just add unit tests in TestFlinkScan.java

##########
File path: 
flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSource.java
##########
@@ -685,4 +782,60 @@ public void testSqlParseError() {
     AssertHelpers.assertThrows("The NaN is not supported by flink now. ",
         NumberFormatException.class, () -> sql(sqlParseErrorLTE));
   }
+
+  /**
+   * The sql can be executed in both streaming and batch mode, in order to get 
the parallelism, we convert the flink
+   * Table to flink DataStream, so we only use streaming mode here.
+   *
+   * @throws TableNotExistException table not exist exception
+   */
+  @Test
+  public void testInferedParallelism() throws TableNotExistException {
+    Assume.assumeTrue("The execute mode should  be streaming mode", 
isStreamingJob);

Review comment:
       Shouldn't the inferParallelism only affect the batch job (See 
FlinkSource#Builder#build)?  So there's no reason that providing unit test in 
streaming  mode ? 
   
   In my mind,  Providing unit tests to check whether the `inferParallelism()` 
is returning the expected parallelism value is enough for this changes.   Seems 
like The ITCase is validating the behavior of DataStreamSource#setParallelism , 
 we could think it's always correct because it's a basic API in flink.

##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java
##########
@@ -70,6 +73,7 @@ public static Builder forRowData() {
     private Table table;
     private TableLoader tableLoader;
     private TableSchema projectedSchema;
+    private ReadableConfig flinkConf;

Review comment:
       For those users that write flink batch jobs in Java API ,  they will 
always pass a flink's `Configuration`, right ?   So how about defining this as  
`org.apache.flink.configuration.Configuraiton`  ? 

##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java
##########
@@ -197,7 +206,7 @@ public FlinkInputFormat buildFormat() {
       TypeInformation<RowData> typeInfo = 
RowDataTypeInfo.of(FlinkSchemaUtil.convert(context.project()));
 
       if (!context.isStreaming()) {
-        return env.createInput(format, typeInfo);
+        return createInputDataStream(format, context, typeInfo);

Review comment:
       In this 
[comment](https://github.com/apache/iceberg/pull/1936#discussion_r554954082), I 
think I did not describe the things  clearly.   I mean  we could move the 
`inferParallelism` into a separate method, don't have to contains the 
DataStream constructing or chaining methods.
   
   ```java
   private int inferParallelism(FlinkInputFormat format, ScanContext context) {
      // ....
   }
   ```

##########
File path: flink/src/main/java/org/apache/iceberg/flink/FlinkTableOptions.java
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink;
+
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+public class FlinkTableOptions {
+
+  private FlinkTableOptions() {
+  }
+
+  public static final ConfigOption<Boolean> 
TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM =
+      
ConfigOptions.key("table.exec.iceberg.infer-source-parallelism").booleanType().defaultValue(true)

Review comment:
       Nit: it's more clear to make each option definition into a separate 
line: 
   
   ```java
         ConfigOptions.key("table.exec.iceberg.infer-source-parallelism")
             .booleanType()
             .defaultValue(true)
             .withDescription("If is false, parallelism of source are set by 
config.\n" +
                 "If is true, source parallelism is inferred according to 
splits number.\n");
   ```

##########
File path: flink/src/main/java/org/apache/iceberg/flink/FlinkTableOptions.java
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink;
+
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+public class FlinkTableOptions {
+
+  private FlinkTableOptions() {
+  }
+
+  public static final ConfigOption<Boolean> 
TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM =
+      
ConfigOptions.key("table.exec.iceberg.infer-source-parallelism").booleanType().defaultValue(true)
+          .withDescription("If is false, parallelism of source are set by 
config.\n" +
+              "If is true, source parallelism is inferred according to splits 
number.\n");
+
+  public static final ConfigOption<Integer> 
TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM_MAX =
+      
ConfigOptions.key("table.exec.iceberg.infer-source-parallelism.max").intType().defaultValue(100)

Review comment:
       ditto

##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java
##########
@@ -208,6 +217,30 @@ public FlinkInputFormat buildFormat() {
             .transform(readerOperatorName, typeInfo, 
StreamingReaderOperator.factory(format));
       }
     }
+
+    private DataStream<RowData> createInputDataStream(FlinkInputFormat format, 
ScanContext context,
+                                                      TypeInformation<RowData> 
typeInfo) {
+      int parallelism = 
flinkConf.get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM);
+      if 
(flinkConf.get(FlinkTableOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM)) {
+        int max = 
flinkConf.get(FlinkTableOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM_MAX);

Review comment:
       Nit: use maxInterParallelism pls.

##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java
##########
@@ -208,6 +217,30 @@ public FlinkInputFormat buildFormat() {
             .transform(readerOperatorName, typeInfo, 
StreamingReaderOperator.factory(format));
       }
     }
+
+    private DataStream<RowData> createInputDataStream(FlinkInputFormat format, 
ScanContext context,
+                                                      TypeInformation<RowData> 
typeInfo) {
+      int parallelism = 
flinkConf.get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM);
+      if 
(flinkConf.get(FlinkTableOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM)) {
+        int max = 
flinkConf.get(FlinkTableOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM_MAX);
+        Preconditions.checkState(max >= 1,
+            
FlinkTableOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM_MAX.key() + " 
cannot be less than 1");
+        int splitNum = 0;

Review comment:
       Nit:  this assignment is redundant ( from intellij).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to