openinx commented on a change in pull request #1936:
URL: https://github.com/apache/iceberg/pull/1936#discussion_r562373214



##########
File path: 
flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScanSql.java
##########
@@ -106,6 +109,51 @@ public void testResiduals() throws Exception {
     assertRecords(runWithFilter(filter, "where dt='2020-03-20' and id=123"), 
expectedRecords, SCHEMA);
   }
 
+  @Test
+  public void testInferedParallelism() throws IOException {
+    Table table = catalog.createTable(TableIdentifier.of("default", "t"), 
SCHEMA, SPEC);
+
+    TableLoader tableLoader = TableLoader.fromHadoopTable(table.location());
+    FlinkInputFormat flinkInputFormat = 
FlinkSource.forRowData().tableLoader(tableLoader).table(table).buildFormat();
+    ScanContext scanContext = ScanContext.builder().build();
+
+    // Empty table ,parallelism at least 1

Review comment:
       Nit:  `inter parallelism should be at least 1.`

##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java
##########
@@ -208,6 +218,33 @@ public FlinkInputFormat buildFormat() {
             .transform(readerOperatorName, typeInfo, 
StreamingReaderOperator.factory(format));
       }
     }
+
+    int inferParallelism(FlinkInputFormat format, ScanContext context) {
+      int parallelism = 
readableConfig.get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM);
+      if 
(readableConfig.get(FlinkTableOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM))
 {
+        int maxInterParallelism = 
readableConfig.get(FlinkTableOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM_MAX);

Review comment:
       Nit:  `maxInterParallelism` -> `maxInferParallelism`,  seems like it's a 
typo ? 

##########
File path: flink/src/main/java/org/apache/iceberg/flink/IcebergTableSource.java
##########
@@ -79,13 +84,20 @@ public boolean isBounded() {
 
   @Override
   public TableSource<RowData> projectFields(int[] fields) {
-    return new IcebergTableSource(loader, schema, properties, fields, 
isLimitPushDown, limit, filters);
+    return new IcebergTableSource(loader, schema, properties, fields, 
isLimitPushDown, limit, filters, readableConfig);
   }
 
   @Override
   public DataStream<RowData> getDataStream(StreamExecutionEnvironment execEnv) 
{
-    return 
FlinkSource.forRowData().env(execEnv).tableLoader(loader).project(getProjectedSchema()).limit(limit)
-        .filters(filters).properties(properties).build();
+    return FlinkSource.forRowData()

Review comment:
       Nit:  maybe we'd better also align the orders as above commented. 

##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java
##########
@@ -70,6 +73,7 @@ public static Builder forRowData() {
     private Table table;
     private TableLoader tableLoader;
     private TableSchema projectedSchema;
+    private ReadableConfig readableConfig;

Review comment:
       Should we provide a `new Configuration()`  for this variable ?  
Otherwise,  it will just throw NPE if people forget to provide a `flinkConf` in 
FlinkSource#Builder because we don't check the nullable in `interParallelism`.

##########
File path: 
flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScanSql.java
##########
@@ -106,6 +109,51 @@ public void testResiduals() throws Exception {
     assertRecords(runWithFilter(filter, "where dt='2020-03-20' and id=123"), 
expectedRecords, SCHEMA);
   }
 
+  @Test
+  public void testInferedParallelism() throws IOException {
+    Table table = catalog.createTable(TableIdentifier.of("default", "t"), 
SCHEMA, SPEC);
+
+    TableLoader tableLoader = TableLoader.fromHadoopTable(table.location());
+    FlinkInputFormat flinkInputFormat = 
FlinkSource.forRowData().tableLoader(tableLoader).table(table).buildFormat();
+    ScanContext scanContext = ScanContext.builder().build();
+
+    // Empty table ,parallelism at least 1
+    int parallelism = FlinkSource.forRowData()
+        .flinkConf(new Configuration())
+        .inferParallelism(flinkInputFormat, scanContext);
+    Assert.assertEquals("Should produce the expected parallelism.", 1, 
parallelism);
+
+    List<Record> writeRecords = RandomGenericData.generate(SCHEMA, 2, 0L);
+    writeRecords.get(0).set(1, 123L);
+    writeRecords.get(0).set(2, "2020-03-20");
+    writeRecords.get(1).set(1, 456L);
+    writeRecords.get(1).set(2, "2020-03-20");
+
+    GenericAppenderHelper helper = new GenericAppenderHelper(table, 
fileFormat, TEMPORARY_FOLDER);
+
+    DataFile dataFile1 = helper.writeFile(TestHelpers.Row.of("2020-03-20", 0), 
writeRecords);
+    DataFile dataFile2 = helper.writeFile(TestHelpers.Row.of("2020-03-21", 0),
+        RandomGenericData.generate(SCHEMA, 2, 0L));
+    helper.appendToTable(dataFile1, dataFile2);
+
+    // Make sure to generate 2 CombinedScanTasks
+    long maxFileLen = Math.max(dataFile1.fileSizeInBytes(), 
dataFile2.fileSizeInBytes());
+    executeSQL(String
+        .format("ALTER TABLE t SET ('read.split.open-file-cost'='1', 
'read.split.target-size'='%s')", maxFileLen));
+
+    // 2 splits ,the parallelism is  2
+    parallelism = FlinkSource.forRowData()

Review comment:
       I think there're other test cases that we don't cover, it's good to 
cover those tests.
   
   1.  table.exec.iceberg.infer-source-parallelism=false;
   2. table.exec.iceberg.infer-source-parallelism.max <= numberOfSplits;
   3. table.exec.iceberg.infer-source-parallelism.max > numberOfSplits;
   4. table.exec.iceberg.infer-source-parallelism.max > limit;
   5. table.exec.iceberg.infer-source-parallelism.max <= limit;
   
   Divide those cases into small method if necessary.  

##########
File path: flink/src/main/java/org/apache/iceberg/flink/IcebergTableSource.java
##########
@@ -51,25 +52,29 @@
   private final TableLoader loader;
   private final TableSchema schema;
   private final Map<String, String> properties;
+  private final ReadableConfig readableConfig;

Review comment:
       Nit:  Let's move this line to line60,  so that the assignment order of 
`IcebergTableSource` constructor could align with these definitions. 

##########
File path: 
flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScanSql.java
##########
@@ -106,6 +109,51 @@ public void testResiduals() throws Exception {
     assertRecords(runWithFilter(filter, "where dt='2020-03-20' and id=123"), 
expectedRecords, SCHEMA);
   }
 
+  @Test
+  public void testInferedParallelism() throws IOException {
+    Table table = catalog.createTable(TableIdentifier.of("default", "t"), 
SCHEMA, SPEC);
+
+    TableLoader tableLoader = TableLoader.fromHadoopTable(table.location());
+    FlinkInputFormat flinkInputFormat = 
FlinkSource.forRowData().tableLoader(tableLoader).table(table).buildFormat();
+    ScanContext scanContext = ScanContext.builder().build();
+
+    // Empty table ,parallelism at least 1
+    int parallelism = FlinkSource.forRowData()
+        .flinkConf(new Configuration())
+        .inferParallelism(flinkInputFormat, scanContext);
+    Assert.assertEquals("Should produce the expected parallelism.", 1, 
parallelism);
+
+    List<Record> writeRecords = RandomGenericData.generate(SCHEMA, 2, 0L);
+    writeRecords.get(0).set(1, 123L);
+    writeRecords.get(0).set(2, "2020-03-20");
+    writeRecords.get(1).set(1, 456L);
+    writeRecords.get(1).set(2, "2020-03-20");
+
+    GenericAppenderHelper helper = new GenericAppenderHelper(table, 
fileFormat, TEMPORARY_FOLDER);
+
+    DataFile dataFile1 = helper.writeFile(TestHelpers.Row.of("2020-03-20", 0), 
writeRecords);
+    DataFile dataFile2 = helper.writeFile(TestHelpers.Row.of("2020-03-21", 0),
+        RandomGenericData.generate(SCHEMA, 2, 0L));

Review comment:
       Those random generated records will be located in partition `2020-03-21` 
?   I guess it's not. 

##########
File path: 
flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSource.java
##########
@@ -137,7 +136,10 @@ public void testFilterPushDownEqual() {
     Assert.assertEquals("Should have 1 record", 1, result.size());
     Assert.assertArrayEquals("Should produce the expected record", 
expectRecord, result.get(0));
 
-    Assert.assertEquals("Should create only one scan", 1, scanEventCount);
+    // Because we add infer parallelism, all data files will be scanned first.
+    // Flink will call FlinkInputFormat#createInputSplits method to scan the 
data files,
+    // plus the operation to get the execution plan, so there are three scan 
event.
+    Assert.assertEquals("Should create 3 scans", 3, scanEventCount);

Review comment:
       We can disable the `table.exec.iceberg.infer-source-parallelism`  for 
all the batch tests by default, then we don't have to change all cases from 
this file.   Actually,  we have wrote many unit tests which depends on the 
`parallelism`, for example  this PR 
https://github.com/apache/iceberg/pull/2064.  Using the inter-parallelism for 
batch unit tests will introduce extra complexity and instability,  so I 
recommend to disable the infer parallelism in our batch unit tests by default: 
   
   ```diff
   diff --git a/flink/src/test/java/org/apache/iceberg/flink/FlinkTestBase.java 
b/flink/src/test/java/org/apache/iceberg/flink/FlinkTestBase.java
   index 5b8e58cf..ab3d56ea 100644
   --- a/flink/src/test/java/org/apache/iceberg/flink/FlinkTestBase.java
   +++ b/flink/src/test/java/org/apache/iceberg/flink/FlinkTestBase.java
   @@ -62,10 +62,17 @@ public abstract class FlinkTestBase extends 
AbstractTestBase {
        if (tEnv == null) {
          synchronized (this) {
            if (tEnv == null) {
   -          this.tEnv = TableEnvironment.create(EnvironmentSettings
   +          EnvironmentSettings settings = EnvironmentSettings
                  .newInstance()
                  .useBlinkPlanner()
   -              .inBatchMode().build());
   +              .inBatchMode()
   +              .build();
   +
   +          TableEnvironment env = TableEnvironment.create(settings);
   +          env.getConfig().getConfiguration()
   +              
.set(FlinkTableOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM, false);
   +
   +          tEnv = env;
            }
          }
        }
   ``` 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to